# gevent.monkey.patch_all() must be called in the main program! import sys import time import signal import socket import traceback import logging import json import gevent import gevent.pywsgi import gevent.queue import flask import circularlog instruments = {ins: 8642 for ins in ['amor', 'boa', 'camea', 'dmc', 'eiger', 'focus', 'hrpt', 'sans', 'tasp', 'zebra'] } def guess_mimetype(filename): if filename.endswith('.js'): mimetype = 'text/javascript' elif filename.endswith('.css'): mimetype = 'text/css' elif filename.endswith('.ico'): mimetype = 'image/x-icon' elif filename.endswith(".png"): mimetype = "image/png" else: mimetype = 'text/html' return mimetype class MyEncoder(json.JSONEncoder): def default(self, obj): try: return super().default(obj) except TypeError: return int(obj) # try to convert SECoP Enum # SSE 'protocol' is described here: https://bit.ly/UPFyxY def to_json_sse(msg): txt = json.dumps(msg, separators=(',', ': '), cls=MyEncoder) logging.debug('data: %s', txt) return 'data: %s\n\n' % txt class Server: """singleton: created once in this module""" interactor_classes = None client_cls = None history_cls = None history = None single_instrument = None db = None def __init__(self): self.instruments = {} self.clients = {} def remove(self, client): try: try: self.clients[client.id].close() except AttributeError: pass del self.clients[client.id] except KeyError: logging.warning('client already removed %s', client.id) def lookup_streams(self, instrument, stream=None, device=None): if self.single_instrument: instrument = self.single_instrument if stream: if isinstance(stream, str): streams = stream.split(',') if stream else [] else: streams = stream else: streams = [] device_names = devices = device.split(',') if device else [] tags = {} if instrument: # tags['instrument'] = instrument stream_dict = self.db.get_streams(instrument, stream=list(streams), device=devices) streams.extend((s for s in stream_dict if s not in streams)) if not devices: device_names = list(filter(None, (t.get('device') for t in stream_dict.values()))) if streams: tags['stream'] = streams[0] if len(streams) == 1 else streams if devices: tags['device'] = devices[0] if len(devices) == 1 else devices return streams, tags, ','.join(device_names) def register_client(self, instrument=None, stream=None, device=None, history_only=None): streams, tags, device_name = self.lookup_streams(instrument, stream, device) if (history_only or '0') != '0': # create dummy client client = self.client_cls(self, [], '', '') else: client = self.client_cls(self, streams, instrument or '', device_name) history = self.history_cls(self, instrument, device_name, tags) # history.db.debug = True # all relevant methods of the history instance are saved in client.handlers # so there is no reference needed to history anymore client.handlers.update(history.handlers) self.clients[client.id] = client return client def run(self, port, db, history_cls, client_cls, single_instrument=None, **interactor_classes): self.single_instrument = single_instrument self.db = db self.history_cls = history_cls self.client_cls = client_cls self.interactor_classes = interactor_classes app.debug = True logging.basicConfig(filename=f'logfile{port}.log', filemode='w', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', force=True) # srv = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') srv = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) def handle_term(sig, frame): srv.stop() srv.close() signal.signal(signal.SIGTERM, handle_term) # def handle_pdb(sig, frame): # import pdb # print('PDB') # pdb.Pdb().set_trace(frame) # signal.signal(signal.SIGUSR1, handle_pdb) srv.serve_forever() server = Server() app = flask.Flask(__name__) update_rider = circularlog.Rider("upd") pollinterval = 0.2 @app.route('/update') def get_update(_=None): # Client Adress: socket.getfqdn(flask.request.remote_addr) kwargs = {k: flask.request.values.get(k) for k in ('instrument', 'stream', 'device', 'history_only')} client = server.register_client(**kwargs) client.remote_info = circularlog.strtm() + " " + socket.getfqdn(flask.request.remote_addr.split(':')[-1]) @flask.stream_with_context def generator(): logging.info('UPDATE %s %s', client.id, socket.getfqdn(flask.request.remote_addr.split(':')[-1])) # msg = dict(type='id', id=client.id, title=instrument.title); # yield to_json_sse(msg) msg = dict(type='id', id=client.id, instrument=kwargs.get('instrument') or server.single_instrument or 'n_a', device=client.device_name) yield to_json_sse(msg) try: lastmsg = time.time() while True: if client.info() == "": print(time.time()-lastmsg) messages = client.poll() for msg in messages: update_rider.put('-', repr(msg)) yield to_json_sse(msg) delay = pollinterval if messages: lastmsg = time.time() # as we had messages, we might try again immediately delay = 0.001 else: if time.time() > lastmsg + 30: if not client.info(): raise GeneratorExit("no activity") logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) yield to_json_sse(dict(type='heartbeat')) lastmsg = time.time() # we need to specify a value > 0 to give other greenlets a chance to continue gevent.sleep(delay) except GeneratorExit as e: logging.info("except clause %r", repr(e)) logging.info('CLOSED %s', client.id) print('CLOSE client') server.remove(client) except Exception as e: logging.info('error') logging.error('%s', traceback.format_exc()) server.remove(client) # msg = dict(type='error',error=traceback.format_exc()) # yield to_json_sse(msg) resp = flask.Response(generator(), mimetype='text/event-stream') resp.headers['Access-Control-Allow-Origin'] = '*' return resp @app.route('/circular') def dump_circular(): circularlog.log() return "log" @app.route('/clients') def show_clients(): result = "" for id in server.clients: c = server.clients[id] result += c.remote_info + " " + "; ".join(c.info()) + "
" return result @app.route('/export') def export(): args = flask.request.args kwargs = dict((k, args.get(k)) for k in args) path = flask.request.path logging.info('GET %s %s', path, repr(kwargs)) try: id = kwargs.pop('id') client = server.clients[id] bytes = client.handlers['export'](**kwargs) return flask.send_file( bytes, as_attachment=True, download_name='export.tsv', mimetype='text/tab-separated-values' ) except Exception as e: logging.error('%s', traceback.format_exc()) circularlog.log() msg = dict(type='error', request=path[1:], error=repr(e)) logging.error('MSG: %r', msg) resp = flask.Response(json.dumps(msg), mimetype='application/json') resp.headers['Access-Control-Allow-Origin'] = '*' return resp @app.route('/getblock') @app.route('/updateblock') @app.route('/sendcommand') @app.route('/console') @app.route('/graph') @app.route('/updategraph') @app.route('/gettime') @app.route('/getvars', methods=["GET", "POST"]) def reply(): args = flask.request.values kwargs = dict((k, args.get(k)) for k in args) path = flask.request.path logging.info('GET %s %r', path, kwargs) try: id = kwargs.pop('id') client = server.clients[id] msg = client.handlers[path[1:]](**kwargs) except Exception as e: logging.error('%s', traceback.format_exc()) circularlog.log() msg = dict(type='error', request=path[1:], error=repr(e)) jsonmsg = json.dumps(msg) if len(jsonmsg) < 120: logging.info('REPLY %s %s', path, jsonmsg) else: logging.info('REPLY %s %s...', path, jsonmsg[:80]) logging.debug('REPLY %s %r', path, jsonmsg) resp = flask.Response(jsonmsg, mimetype='application/json') resp.headers['Access-Control-Allow-Origin'] = '*' return resp @app.route('/test/') def subdir_test_file(file): gevent.sleep(2) resp = flask.send_file("client/test/"+file, mimetype=guess_mimetype(file)) return resp @app.route('/components/curves_settings_popup/color_selector/') @app.route('/components/curves_settings_popup/') @app.route('/components/action_entry/') @app.route('/components/export_popup/') @app.route('/components/dates_popup/') @app.route('/components/menu_popup/') @app.route('/components/help_popup/') @app.route('/components/help_entry/') @app.route('/components/control/') @app.route('/components/divider/') @app.route('/components/states_indicator/dates/') @app.route('/res/') @app.route('/jsFiles/') @app.route('/cssFiles/') @app.route('/externalFiles/') def subdir_file(file): subdir = "/".join(flask.request.path.split("/")[1:-1]) resp = flask.send_file("client/" + subdir+"/"+file, mimetype=guess_mimetype(file)) # resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" return resp @app.route('/externalFiles/maps/.map') def replace_by_empty(file): return "" @app.route('/') def default(): if not any(flask.request.values.get(k) for k in ('instrument', 'stream', 'device')): if not server.single_instrument: return select_experiment() return general_file('SEAWebClient.html') #@app.route('/select_instrument') #def select_instrument(): # out = [''' # #'''] # result = {} # for stream, tags in server.db.get_streams().items(): # ins = tags.get('instrument', '0') # result.setdefault(ins, []).append((stream, tags.get('device'))) # bare_streams = result.pop('0', []) # for ins, streams in result.items(): # out.append(f'') # out.extend(f'' for s, d in streams) # out.append('') # for stream, device in bare_streams: # out.append(f'') # out.append('
instrumentdevices
{ins}{d or s}
{stream}{device}
') # out.append('

servers on the instruments:

') # out.extend([f"{i} \n" for i in instlist]) # out.extend(['', '']) # return '\n'.join(out) @app.route('/select_experiment') def select_experiment(): out = [''' '''] ONEMONTH = 30 * 24 * 3600 out.append('
direct link to instruments:
') out.extend([f'{ins.upper()} \n' for ins, port in instruments.items()]) if server.db.has_local: out.append('

linse-c (central)

') class prev: # just a namesapce title = None legend = None def change_title(text): if text == prev.title: return False if prev.legend: out.append(f'{prev.legend}') prev.legend = None prev.title = text out.append(f'') return True # TODO: sort this by (instrument / device) and list dates # period format: Ymd..Ymd, Ymd (single date), Ymd..now, HM..now try: now = time.time() timerange = flask.request.values.get('time') if timerange == 'all': starttime, endtime = None, None elif timerange: timerange = timerange.split(',') starttime, endtime = [None if timerange[i] == '0' else int(timerange[i]) for i in (0, -1)] else: starttime, endtime = now - ONEMONTH, now chunk_list = [] for key, chunk_dict in server.db.get_experiments(starttime, endtime).items(): for (streams, devices), chunks in chunk_dict.items(): chunk_list.extend((r[1], r[0], key, devices) for r in chunks) chunk_list.sort(reverse=True) for end, beg, key, devices in chunk_list: today, begdate, enddate = (time.strftime("%Y-%m-%d", time.localtime(t)) for t in (now, beg, end)) port = None if key[0] == 'instrument': ins = key[1] port = instruments.get(ins) left = ins.upper() else: left = key[1] # shown in left column args = ['='.join(key)] remote = None if port is None else f'http://{ins}.psi.ch:{port}' history_only = bool(remote) if end > now: if begdate == today: daterange = f'since {time.strftime("%H:%M", time.localtime(beg))}' else: daterange = f'since {begdate}' change_title('currently running') else: args.append(f'time={beg},{end}') history_only = True remote = None daterange = begdate if begdate == enddate else f'{begdate}...{enddate}' if end > now - ONEMONTH: change_title('recently running (history graphics only)') else: change_title('older than 30 days') if history_only: args.append('hr=1') def link(label): return f'{label}' label = " ".join(devices) if remote: prev.legend = '' out.append(f'' f'') else: out.append(f'') out.append(f'') if timerange: out.append(f'

earlier dates


') out.append('

{text}:
linse-c*: history graphics only
{ins.upper()}{label}{link("linse-c*")}
{link(left)}{label}{daterange}
') out.extend(['', '']) except Exception as e: logging.error('%s', traceback.format_exc()) circularlog.log() out = [f"ERROR {e!r}"] return '\n'.join(out) @app.route('/') def general_file(file): subdir = "client/" try: resp = flask.send_file(subdir+file, mimetype=guess_mimetype(file)) except FileNotFoundError: logging.warning('file %s not found', file) return 'file not found' # resp.headers['Content-Security-Policy'] = "sandbox; script-src 'unsafe-inline';" return resp def hostport_split(hostport): h = hostport.split(':') return (h[0], int(h[1]))