From 1201801170b0f856a7c0945e4fa2441725746004 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 4 May 2026 13:06:03 +0200 Subject: [PATCH] break busy loop blocking the server - Use gevent.sleep with a tiny value after a message has been received instead of no call to sleep at all. This was the reason for the webserver to block when used with prep8 and the leiden dil with a lot of data - move gevent.monkey.patch_all() to secop-webserver - properly close clients --- base.py | 23 ++++++++++++++++++++--- secop-webserver | 2 ++ webserver.py | 18 +++++++++++------- 3 files changed, 33 insertions(+), 10 deletions(-) mode change 100755 => 100644 webserver.py diff --git a/base.py b/base.py index 51208b4..b01d73e 100644 --- a/base.py +++ b/base.py @@ -41,6 +41,7 @@ class HandlerBase: class Client(HandlerBase): + """a generic parameter client""" dictionary = {} # dict (kind, uri) of node def __init__(self, server, streams, instrument_name, device_name): @@ -54,9 +55,7 @@ class Client(HandlerBase): for uri in streams: urisplit = uri.rsplit('://') kind = urisplit[0] if len(urisplit) == 2 else 'secop' - node = self.dictionary.get((kind, uri)) - if node is None: def change_callback(dictionary=self.dictionary, kind_uri=(kind, uri)): @@ -72,6 +71,21 @@ class Client(HandlerBase): self.instrument_name = instrument_name self.device_name = device_name # do not know if this is needed + def close(self): + """remove unused nodes from dictionary + + The last one turns off the lights + """ + for cl in self.server.clients.values(): + if cl is not self and isinstance(cl, Client): + for uri in cl.nodes: + # this node is still used + self.nodes.pop(uri, None) + # remaining in self.nodes are to be removed from dictionary + for uri, node in self.nodes.items(): + print(f'disconnect {uri}') + self.dictionary.pop(uri, None) + def poll(self): updates = sum((n.get_updates(self.cache, self.update_filter) for n in self.nodes.values()), start=[]) result = [dict(type='update', updates=updates)] if updates else [] @@ -87,7 +101,10 @@ class Client(HandlerBase): for node in self.nodes.values(): node.add_main_components(components) return dict(type='draw', path='main', title='modules', components=components) - node = self.node_map[path] + try: + node = self.node_map[path] + except KeyError: + raise RuntimeError(f'ERROR path={path}') return dict(type='draw', path=path, title=path, components=node.get_components(path)) def w_updateblock(self, path): diff --git a/secop-webserver b/secop-webserver index f187b4c..0751f7c 100755 --- a/secop-webserver +++ b/secop-webserver @@ -1,4 +1,6 @@ #!/usr/bin/env python +from gevent import monkey +monkey.patch_all() import sys # look for sehistory and frappy diff --git a/webserver.py b/webserver.py old mode 100755 new mode 100644 index c51d912..0196da6 --- a/webserver.py +++ b/webserver.py @@ -1,5 +1,4 @@ -from gevent import monkey -monkey.patch_all() +# gevent.monkey.patch_all() must be called in the main program! import sys import time import signal @@ -13,7 +12,6 @@ import gevent.queue import flask import circularlog - instruments = {ins: 8642 for ins in ['amor', 'boa', 'camea', 'dmc', 'eiger', 'focus', 'hrpt', 'sans', 'tasp', 'zebra'] } @@ -63,6 +61,10 @@ class Server: def remove(self, client): try: + try: + self.clients[client.id].close() + except AttributeError: + pass del self.clients[client.id] except KeyError: logging.warning('client already removed %s', client.id) @@ -114,9 +116,8 @@ class Server: self.interactor_classes = interactor_classes app.debug = True - logging.basicConfig(filename=f'logfile{port}.log', filemode='w', level=logging.INFO, - format='%(asctime)s %(levelname)s %(message)s') + format='%(asctime)s %(levelname)s %(message)s', force=True) # srv = gevent.wsgi.WSGIServer(('', port), app, keyfile='key.key', certfile='key.crt') srv = gevent.pywsgi.WSGIServer(('', port), app, log=logging.getLogger('server')) @@ -169,8 +170,11 @@ def get_update(_=None): for msg in messages: update_rider.put('-', repr(msg)) yield to_json_sse(msg) + delay = pollinterval if messages: lastmsg = time.time() + # as we had messages, we might try again immediately + delay = 0.001 else: if time.time() > lastmsg + 30: if not client.info(): @@ -178,8 +182,8 @@ def get_update(_=None): logging.info('HEARTBEAT %s (%s)', client.id, "; ".join(client.info())) yield to_json_sse(dict(type='heartbeat')) lastmsg = time.time() - else: - gevent.sleep(pollinterval) + # we need to specify a value > 0 to give other greenlets a chance to continue + gevent.sleep(delay) except GeneratorExit as e: logging.info("except clause %r", repr(e)) logging.info('CLOSED %s', client.id)