feat(sim_waveform): added option to emit data with add_slice

This commit is contained in:
2025-03-06 18:15:17 +01:00
parent 71a81d28fd
commit 56bf4c9bba

View File

@@ -4,6 +4,7 @@ import os
import threading import threading
import time import time
import traceback import traceback
from typing import Any
import numpy as np import numpy as np
from bec_lib import messages from bec_lib import messages
@@ -64,6 +65,8 @@ class SimWaveform(Device):
) )
# Can be extend or append # Can be extend or append
async_update = Cpt(SetableSignal, value="append", kind=Kind.config) async_update = Cpt(SetableSignal, value="append", kind=Kind.config)
slice_size = Cpt(SetableSignal, value=100, dtype=np.int32, kind=Kind.config)
slice_update = Cpt(SetableSignal, value=False, dtype=bool, kind=Kind.config)
def __init__( def __init__(
self, self,
@@ -94,6 +97,7 @@ class SimWaveform(Device):
self.scan_info = scan_info self.scan_info = scan_info
if self.sim_init: if self.sim_init:
self.sim.set_init(self.sim_init) self.sim.set_init(self.sim_init)
self._slice_index = 0
@property @property
def registered_proxies(self) -> None: def registered_proxies(self) -> None:
@@ -113,8 +117,23 @@ class SimWaveform(Device):
def acquire(status: DeviceStatus): def acquire(status: DeviceStatus):
try: try:
for _ in range(self.burst.get()): for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get()) values = self.waveform.get()
self._send_async_update() if self.slice_update.get():
size = self.slice_size.get()
mod = len(values) % size
num_slices = len(values) // size + int(mod > 0)
for i in range(num_slices):
value_slice = values[i * size : min((i + 1) * size, len(values))]
logger.info(
f"Sending slice {i} of {self._slice_index} with length {len(value_slice)}"
)
self._run_subs(sub_type=self.SUB_MONITOR, value=value_slice)
self._send_async_update(index=self._slice_index, value=value_slice)
self._slice_index += 1
else:
value = self.waveform.get()
self._run_subs(sub_type=self.SUB_MONITOR, value=value)
self._send_async_update(value=value)
if self.stopped: if self.stopped:
raise DeviceStopError(f"{self.name} was stopped") raise DeviceStopError(f"{self.name} was stopped")
status.set_finished() status.set_finished()
@@ -128,7 +147,7 @@ class SimWaveform(Device):
self._trigger_thread.start() self._trigger_thread.start()
return status return status
def _send_async_update(self): def _send_async_update(self, index: int | None = None, value: Any = None) -> None:
"""Send the async update to BEC.""" """Send the async update to BEC."""
async_update_type = self.async_update.get() async_update_type = self.async_update.get()
if async_update_type not in ["extend", "append"]: if async_update_type not in ["extend", "append"]:
@@ -136,12 +155,21 @@ class SimWaveform(Device):
waveform_shape = self.waveform_shape.get() waveform_shape = self.waveform_shape.get()
if async_update_type == "append": if async_update_type == "append":
if index is not None:
metadata = {
"async_update": {
"type": "add_slice",
"index": index,
"max_shape": [None, waveform_shape],
}
}
else:
metadata = {"async_update": {"type": "add", "max_shape": [None, waveform_shape]}} metadata = {"async_update": {"type": "add", "max_shape": [None, waveform_shape]}}
else: else:
metadata = {"async_update": {"type": "add", "max_shape": [None]}} metadata = {"async_update": {"type": "add", "max_shape": [None]}}
msg = messages.DeviceMessage( msg = messages.DeviceMessage(
signals={self.waveform.name: {"value": self.waveform.get(), "timestamp": time.time()}}, signals={self.waveform.name: {"value": value, "timestamp": time.time()}},
metadata=metadata, metadata=metadata,
) )
# logger.warning(f"Adding async update to {self.name} and {self.scan_info.msg.scan_id}") # logger.warning(f"Adding async update to {self.name} and {self.scan_info.msg.scan_id}")
@@ -177,6 +205,7 @@ class SimWaveform(Device):
self.exp_time.set(self.scan_info.msg.scan_parameters["exp_time"]) self.exp_time.set(self.scan_info.msg.scan_parameters["exp_time"])
self.burst.set(self.scan_info.msg.scan_parameters["frames_per_trigger"]) self.burst.set(self.scan_info.msg.scan_parameters["frames_per_trigger"])
self.stopped = False self.stopped = False
self._slice_index = 0
logger.warning(f"Staged {self.name}, scan_id : {self.scan_info.msg.scan_id}") logger.warning(f"Staged {self.name}, scan_id : {self.scan_info.msg.scan_id}")
return super().stage() return super().stage()
@@ -186,6 +215,7 @@ class SimWaveform(Device):
Send reads from all config signals to redis Send reads from all config signals to redis
""" """
logger.warning(f"Unstaging {self.name}, {self._staged}") logger.warning(f"Unstaging {self.name}, {self._staged}")
self._slice_index = 0
if self.stopped is True or not self._staged: if self.stopped is True or not self._staged:
return super().unstage() return super().unstage()
return super().unstage() return super().unstage()