From 579484ae3d7ac372259de6e645998f786c95419a Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Thu, 5 Feb 2026 09:09:24 +0100 Subject: [PATCH] feat(sim): add a new device to simulate ramp ups --- ophyd_devices/sim/sim_test_devices.py | 118 +++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 2 deletions(-) diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 635b095..a605bfc 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -217,6 +217,111 @@ class SynFlyerLamNI(Device, PositionerBase): flyer.start() +class SimDeviceWithSignalDelay(Device): + """ + Simulated device with a delay. Can be used to simulate a slow rampup after an initial delay. + One use case is to simulate a beam loss and its recovery over time. + """ + + start = Cpt(SetableSignal, value=0, kind=Kind.config) + min_val = Cpt(SetableSignal, value=0, kind=Kind.config) + max_val = Cpt(SetableSignal, value=10, kind=Kind.config) + delay = Cpt(SetableSignal, value=1.0, kind=Kind.config) # Delay in seconds before rampup starts + rampup_time = Cpt(SetableSignal, value=5.0, kind=Kind.config) # Total rampup time in seconds + value = Cpt(SetableSignal, value=0, kind=Kind.hinted) # Current value signal + + SUB_VALUE = "value" + _default_sub = SUB_VALUE + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._rampup_thread = None + self._stop_event = threading.Event() + self._running = False + + # Subscribe to start signal changes + self.start.subscribe(self._on_start_changed) + self.value.subscribe(self._on_value_changed) + + def _on_start_changed(self, value, **kwargs): + """Callback when start signal changes.""" + if value == 1 and not self._running: + self._start_rampup() + + def _on_value_changed(self, value, **kwargs): + self._run_subs(self.SUB_VALUE, value=value, **kwargs) + + def _start_rampup(self): + """Start the rampup simulation in a background thread.""" + if self._running: + return + + self._stop_event.clear() + self._running = True + self._rampup_thread = threading.Thread(target=self._rampup_worker, daemon=True) + self._rampup_thread.start() + + # Reset start signal to 0 after launching + self.start.put(0) + + def _stop_rampup(self): + """Stop the rampup simulation.""" + if not self._running: + return + + self._stop_event.set() + if self._rampup_thread is not None: + self._rampup_thread.join(timeout=1.0) + self._running = False + self.value.put(self.min_val.get()) + + def _rampup_worker(self): + """Worker thread that handles delay and rampup.""" + try: + min_value = self.min_val.get() + max_value = self.max_val.get() + delay_time = self.delay.get() + rampup_duration = self.rampup_time.get() + + # Wait for delay period + if self._stop_event.wait(timeout=delay_time): + return # Stopped during delay + + # Set initial value to min_val + self.value.put(min_value) + + # Rampup phase + start_time = ttime.time() + update_interval = 0.1 # Update every 100ms + + while not self._stop_event.is_set(): + elapsed = ttime.time() - start_time + + if elapsed >= rampup_duration: + # Rampup complete + self.value.put(max_value) + break + + # Calculate current value based on linear interpolation + progress = elapsed / rampup_duration + current_value = min_value + (max_value - min_value) * progress + self.value.put(current_value) + + # Wait for next update or stop signal + if self._stop_event.wait(timeout=update_interval): + break + + except Exception as exc: + logger.error(f"Error in rampup worker for {self.name}: {exc}") + finally: + self._running = False + + def stop(self, success=False): + """Stop the device and any ongoing rampup.""" + self._stop_rampup() + super().stop(success=success) + + class SimPositionerWithCommFailure(SimPositioner): fails = Cpt(SetableSignal, value=0) @@ -425,5 +530,14 @@ class SimCameraWithPSIComponents(SimCamera): if __name__ == "__main__": - cam = SimCameraWithPSIComponents(name="cam") - cam.read() + ring = SimDeviceWithSignalDelay(name="ring") + ring.min_val.put(300) + ring.max_val.put(400) + ring.value.put(400) + ring.delay.put(5) + ring.rampup_time.put(20) + ring.subscribe(lambda *args, **kwargs: print(f"Value changed to {kwargs['value']}")) + ring.start.put(1) + while ring._running: + ttime.sleep(0.5) + print("Rampup complete")