added cooldown to avoid checking for a non-existent file at 100 Hz

This commit is contained in:
2025-11-02 16:50:16 +01:00
parent 46fca4a6b5
commit 6a441e8c63
3 changed files with 88 additions and 0 deletions

View File

@@ -2,6 +2,8 @@ import functools
import importlib.util as ilu
from pathlib import Path
from .utils import cooldown
BASE = "/gpfs/photonics/swissfel/buffer/dap/custom_dap_scripts"
BASE = Path(BASE)
@@ -28,6 +30,7 @@ def calc_custom(results, image, pixel_mask):
results.update(res)
@cooldown(30)
@functools.cache
def load_custom(script):
beamline, name = script.split(":")

View File

@@ -1,4 +1,5 @@
from .cooldown import cooldown
from .npmemo import npmemo

View File

@@ -0,0 +1,84 @@
import functools
from time import time
class cooldown:
def __init__(self, seconds):
self.seconds = seconds
self.last_fail_time = 0
self.last_fail_exc = None
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
remaining = self.get_remaining()
if remaining > 0:
raise CooldownActive(func, args, kwargs, remaining) from self.last_fail_exc
try:
return func(*args, **kwargs)
except Exception as exc:
self.last_fail_time = time()
self.last_fail_exc = exc
raise exc
return wrapper
def get_remaining(self):
now = time()
elapsed = now - self.last_fail_time
return self.seconds - elapsed
class CooldownActive(Exception):
def __init__(self, func, args, kwargs, remaining):
call = format_call(func, args, kwargs)
msg = f"function call is on cooldown ({remaining:.3g}s remaining): {call}"
super().__init__(msg)
def format_call(func, args, kwargs):
fargs = [truncated_repr(i) for i in args]
fargs += [f"{k}={truncated_repr(v)}" for k, v in kwargs.items()]
fargs = ", ".join(fargs)
return f"{func.__name__}({fargs})"
def truncated_repr(x, length=10, ellipsis="..."):
thresh = 2 * length + len(ellipsis)
x = repr(x)
if len(x) <= thresh:
return x
return x[:length] + ellipsis + x[-length:]
if __name__ == "__main__":
from time import sleep
@cooldown(seconds=5)
def load(fn):
return open(fn).read()
print(load(__file__))
while True:
try:
load("does_not_exist.txt")
except Exception as e:
en = type(e).__name__
print(f"{en}: {e}")
sleep(0.1)