added first version of a detector sim

This commit is contained in:
wakonig_k 2022-09-06 20:00:52 +02:00
parent a3b3efc4a5
commit d60b58a5f5

View File

@ -1,6 +1,7 @@
import threading
import time as ttime
import warnings
from typing import List
import numpy as np
from bec_utils import BECMessage, MessageEndpoints, bec_logger
@ -193,6 +194,78 @@ class SynAxisMonitor(Device):
self.readback.name = self.name
class _SLSDetectorAcquireSignal(Signal):
def put(self, value, *, timestamp=None, force=False):
self._readback = value
self.parent.set(value)
def get(self):
self._readback = self.parent.sim_state["acquire"]
return self.parent.sim_state["acquire"]
class _SLSDetectorConfigSignal(Signal):
def put(self, value, *, timestamp=None, force=False):
self._readback = value
self.parent.sim_state[self.name] = value
def get(self):
self._readback = self.parent.sim_state[self.name]
return self.parent.sim_state[self.name]
class SynSLSDetector(Device):
USER_ACCESS = []
acquire = Cpt(_SLSDetectorAcquireSignal, value=0, kind="hinted")
exp_time = Cpt(_SLSDetectorConfigSignal, name="exp_time", value=1, kind="config")
file_path = Cpt(_SLSDetectorConfigSignal, name="file_path", value="", kind="config")
file_pattern = Cpt(_SLSDetectorConfigSignal, name="file_pattern", value="", kind="config")
frames = Cpt(_SLSDetectorConfigSignal, name="frames", value=1, kind="config")
save_file = Cpt(_SLSDetectorConfigSignal, name="save_file", value=False, kind="config")
def __init__(self, *, name, kind=None, parent=None, **kwargs):
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim_state = {
"acquire": 0,
f"{self.name}_file_path": "",
f"{self.name}_file_pattern": "",
f"{self.name}_frames": 1,
f"{self.name}_save_file": False,
f"{self.name}_exp_time": 1,
}
self._stopped = False
def trigger(self):
status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
def acquire():
try:
for _ in range(self.frames.get()):
ttime.sleep(self.exp_time.get())
if self._stopped:
raise DeviceStop
except DeviceStop:
pass
finally:
self._stopped = False
self._done_acquiring()
threading.Thread(target=acquire, daemon=True).start()
return status
def stage(self) -> List[object]:
return super().stage()
def unstage(self) -> List[object]:
return super().unstage()
def stop(self, *, success=False):
super().stop(success=success)
self._stopped = True
class DummyController:
USER_ACCESS = ["some_var", "controller_show_all"]
some_var = 10
@ -507,3 +580,8 @@ class SynAxisOPAAS(Device, PositionerBase):
def egu(self):
return "mm"
if __name__ == "__main__":
det = SynSLSDetector(name="moench")
det.trigger()