scan fixes example data

This commit is contained in:
Henrik Lemke
2026-03-28 21:11:32 +01:00
parent bc397711b8
commit ca809d5a8a
4 changed files with 120 additions and 2 deletions
+2
View File
@@ -20,6 +20,8 @@ from epics import PV
from ..acquisition.utilities import Acquisition
from ..elements.assembly import Assembly
from ..utilities.path_alias import PathAlias
# from ..acquisition.decorators import scannable
import inputimeout
from IPython import get_ipython
from os.path import relpath
+2 -1
View File
@@ -1,6 +1,6 @@
import weakref
from eco.acquisition.counters import CounterValue
from eco.acquisition import scan
# from eco.acquisition import scan
# from lazy_object_proxy import Proxy as LazyProxy
@@ -8,6 +8,7 @@ from eco.acquisition import scan
def scannable(Obj):
@property
def scans(self):
from eco.acquisition import scan # moved import here to avoid circular import
if hasattr(self, "_counter"):
if not hasattr(self, "_old_counters"):
self._old_counters = []
+54 -1
View File
@@ -1,4 +1,6 @@
from copy import deepcopy
from threading import Thread
from eco.acquisition.decorators import scannable
from eco.elements.adjustable import (
AdjustableMemory,
default_representation,
@@ -76,13 +78,19 @@ class DetectorVirtual(Assembly):
@call_convenience
@value_property
@default_representation
@scannable
class DetectorGet:
def __init__(self, foo_get, cache_get_seconds=None, name=None):
def __init__(
self, foo_get, cache_get_seconds=None, monitor_frequency=None, name=None
):
""" """
self.alias = Alias(name)
self.name = name
self._get = foo_get
self._cache_get_seconds = cache_get_seconds
self._accumulate_frequency = monitor_frequency
if monitor_frequency:
self.set_current_value_callback = self._set_current_value_callback
def get_current_value(self):
ts = time.time()
@@ -97,6 +105,11 @@ class DetectorGet:
self._get_cache = (ts, value)
return value
def _set_current_value_callback(self):
return CallbackTimedelta(
self, frequency=self._accumulate_frequency, func="accumulate"
)
@call_convenience
@value_property
@@ -118,3 +131,43 @@ class DetectorMemory:
cv = self.get_current_value()
s = f"{name} at value: {cv}" + "\n"
return s
class CallbackTimedelta:
def __init__(
self, detector, frequency=10, func="accumulate", collector=None, run_once=True
):
self.detector = detector
self.frequency = frequency
# self.data = collector
if func == "accumulate":
func = self.accumulate_values
if collector is None:
collector = {"timestamps": [], "values": []}
self.data = (
collector # {"timestamps": [], "values": [], "timestamps_ioc": []}
)
self.foo = func
self.run_once = run_once
self.running = False
self.thread = None
def start(self, add_current_value=True):
if add_current_value:
ts_local = time.time()
self.data["timestamps"].append(ts_local)
self.data["values"].append(self.detector.get_current_value())
self.running = True
self.thread = Thread(target=self.accumulate_values)
self.thread.start()
def accumulate_values(self, *args, **kwargs):
while self.running:
ts_local = time.time()
self.data["timestamps"].append(ts_local)
self.data["values"].append(self.detector.get_current_value())
time.sleep(1 / self.frequency)
def stop(self):
self.running = False
self.thread.join()
+62
View File
@@ -0,0 +1,62 @@
import numpy as np
import random
import matplotlib.pyplot as plt
def _calc_0(lx):
r, h, k = 0.4, 0.5, 0.5
inner = r**2 - (lx - h) ** 2
if inner < 0:
return []
y_top = k + np.sqrt(inner)
y_bottom = k - np.sqrt(inner)
points = [y_top]
# 'e' has a gap in the bottom right arc
if lx < 0.8:
points.append(y_bottom)
# 'e' has a middle bar
if 0.15 <= lx <= 0.85:
points.append(0.5)
return points
def _calc_1(lx):
r, h, k = 0.4, 0.5, 0.5
inner = r**2 - (lx - h) ** 2
if inner < 0:
return []
# 'c' is open on the right side
if lx > 0.75:
return []
return [k + np.sqrt(inner), k - np.sqrt(inner)]
def _calc_2(lx):
r, h, k = 0.4, 0.5, 0.5
inner = r**2 - (lx - h) ** 2
if inner < 0:
return []
return [k + np.sqrt(inner), k - np.sqrt(inner)]
def get_sig_y(x):
if x < 0 or x > 1:
return 0.0
# Split the [0, 1] range into three segments
if x <= 1 / 3:
lx = x * 3
possible_y = _calc_0(lx)
elif x <= 2 / 3:
lx = (x - 1 / 3) * 3
possible_y = _calc_1(lx)
else:
lx = (x - 2 / 3) * 3
possible_y = _calc_2(lx)
ret = float(random.choice(possible_y)) if possible_y else 0.0
return np.random.normal(scale=0.05) + ret