diff --git a/ophyd_devices/sim/sim_waveform.py b/ophyd_devices/sim/sim_waveform.py index ea4af98..f4b1cc2 100644 --- a/ophyd_devices/sim/sim_waveform.py +++ b/ophyd_devices/sim/sim_waveform.py @@ -4,6 +4,7 @@ import os import threading import time import traceback +from typing import Any import numpy as np from bec_lib import messages @@ -64,6 +65,8 @@ class SimWaveform(Device): ) # Can be extend or append async_update = Cpt(SetableSignal, value="append", kind=Kind.config) + slice_size = Cpt(SetableSignal, value=100, dtype=np.int32, kind=Kind.config) + slice_update = Cpt(SetableSignal, value=False, dtype=bool, kind=Kind.config) def __init__( self, @@ -94,6 +97,7 @@ class SimWaveform(Device): self.scan_info = scan_info if self.sim_init: self.sim.set_init(self.sim_init) + self._slice_index = 0 @property def registered_proxies(self) -> None: @@ -113,8 +117,23 @@ class SimWaveform(Device): def acquire(status: DeviceStatus): try: for _ in range(self.burst.get()): - self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get()) - self._send_async_update() + values = self.waveform.get() + if self.slice_update.get(): + size = self.slice_size.get() + mod = len(values) % size + num_slices = len(values) // size + int(mod > 0) + for i in range(num_slices): + value_slice = values[i * size : min((i + 1) * size, len(values))] + logger.info( + f"Sending slice {i} of {self._slice_index} with length {len(value_slice)}" + ) + self._run_subs(sub_type=self.SUB_MONITOR, value=value_slice) + self._send_async_update(index=self._slice_index, value=value_slice) + self._slice_index += 1 + else: + value = self.waveform.get() + self._run_subs(sub_type=self.SUB_MONITOR, value=value) + self._send_async_update(value=value) if self.stopped: raise DeviceStopError(f"{self.name} was stopped") status.set_finished() @@ -128,7 +147,7 @@ class SimWaveform(Device): self._trigger_thread.start() return status - def _send_async_update(self): + def _send_async_update(self, index: int | None = None, value: Any = None) -> None: """Send the async update to BEC.""" async_update_type = self.async_update.get() if async_update_type not in ["extend", "append"]: @@ -136,12 +155,21 @@ class SimWaveform(Device): waveform_shape = self.waveform_shape.get() if async_update_type == "append": - metadata = {"async_update": {"type": "add", "max_shape": [None, waveform_shape]}} + if index is not None: + metadata = { + "async_update": { + "type": "add_slice", + "index": index, + "max_shape": [None, waveform_shape], + } + } + else: + metadata = {"async_update": {"type": "add", "max_shape": [None, waveform_shape]}} else: metadata = {"async_update": {"type": "add", "max_shape": [None]}} msg = messages.DeviceMessage( - signals={self.waveform.name: {"value": self.waveform.get(), "timestamp": time.time()}}, + signals={self.waveform.name: {"value": value, "timestamp": time.time()}}, metadata=metadata, ) # logger.warning(f"Adding async update to {self.name} and {self.scan_info.msg.scan_id}") @@ -177,6 +205,7 @@ class SimWaveform(Device): self.exp_time.set(self.scan_info.msg.scan_parameters["exp_time"]) self.burst.set(self.scan_info.msg.scan_parameters["frames_per_trigger"]) self.stopped = False + self._slice_index = 0 logger.warning(f"Staged {self.name}, scan_id : {self.scan_info.msg.scan_id}") return super().stage() @@ -186,6 +215,7 @@ class SimWaveform(Device): Send reads from all config signals to redis """ logger.warning(f"Unstaging {self.name}, {self._staged}") + self._slice_index = 0 if self.stopped is True or not self._staged: return super().unstage() return super().unstage()