feat(sim): add a new device to simulate ramp ups

This commit is contained in:
2026-02-05 09:09:24 +01:00
parent 279e99cb01
commit 579484ae3d

View File

@@ -217,6 +217,111 @@ class SynFlyerLamNI(Device, PositionerBase):
flyer.start()
class SimDeviceWithSignalDelay(Device):
"""
Simulated device with a delay. Can be used to simulate a slow rampup after an initial delay.
One use case is to simulate a beam loss and its recovery over time.
"""
start = Cpt(SetableSignal, value=0, kind=Kind.config)
min_val = Cpt(SetableSignal, value=0, kind=Kind.config)
max_val = Cpt(SetableSignal, value=10, kind=Kind.config)
delay = Cpt(SetableSignal, value=1.0, kind=Kind.config) # Delay in seconds before rampup starts
rampup_time = Cpt(SetableSignal, value=5.0, kind=Kind.config) # Total rampup time in seconds
value = Cpt(SetableSignal, value=0, kind=Kind.hinted) # Current value signal
SUB_VALUE = "value"
_default_sub = SUB_VALUE
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rampup_thread = None
self._stop_event = threading.Event()
self._running = False
# Subscribe to start signal changes
self.start.subscribe(self._on_start_changed)
self.value.subscribe(self._on_value_changed)
def _on_start_changed(self, value, **kwargs):
"""Callback when start signal changes."""
if value == 1 and not self._running:
self._start_rampup()
def _on_value_changed(self, value, **kwargs):
self._run_subs(self.SUB_VALUE, value=value, **kwargs)
def _start_rampup(self):
"""Start the rampup simulation in a background thread."""
if self._running:
return
self._stop_event.clear()
self._running = True
self._rampup_thread = threading.Thread(target=self._rampup_worker, daemon=True)
self._rampup_thread.start()
# Reset start signal to 0 after launching
self.start.put(0)
def _stop_rampup(self):
"""Stop the rampup simulation."""
if not self._running:
return
self._stop_event.set()
if self._rampup_thread is not None:
self._rampup_thread.join(timeout=1.0)
self._running = False
self.value.put(self.min_val.get())
def _rampup_worker(self):
"""Worker thread that handles delay and rampup."""
try:
min_value = self.min_val.get()
max_value = self.max_val.get()
delay_time = self.delay.get()
rampup_duration = self.rampup_time.get()
# Wait for delay period
if self._stop_event.wait(timeout=delay_time):
return # Stopped during delay
# Set initial value to min_val
self.value.put(min_value)
# Rampup phase
start_time = ttime.time()
update_interval = 0.1 # Update every 100ms
while not self._stop_event.is_set():
elapsed = ttime.time() - start_time
if elapsed >= rampup_duration:
# Rampup complete
self.value.put(max_value)
break
# Calculate current value based on linear interpolation
progress = elapsed / rampup_duration
current_value = min_value + (max_value - min_value) * progress
self.value.put(current_value)
# Wait for next update or stop signal
if self._stop_event.wait(timeout=update_interval):
break
except Exception as exc:
logger.error(f"Error in rampup worker for {self.name}: {exc}")
finally:
self._running = False
def stop(self, success=False):
"""Stop the device and any ongoing rampup."""
self._stop_rampup()
super().stop(success=success)
class SimPositionerWithCommFailure(SimPositioner):
fails = Cpt(SetableSignal, value=0)
@@ -425,5 +530,14 @@ class SimCameraWithPSIComponents(SimCamera):
if __name__ == "__main__":
cam = SimCameraWithPSIComponents(name="cam")
cam.read()
ring = SimDeviceWithSignalDelay(name="ring")
ring.min_val.put(300)
ring.max_val.put(400)
ring.value.put(400)
ring.delay.put(5)
ring.rampup_time.put(20)
ring.subscribe(lambda *args, **kwargs: print(f"Value changed to {kwargs['value']}"))
ring.start.put(1)
while ring._running:
ttime.sleep(0.5)
print("Rampup complete")