diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index af8bd21..dc702b4 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -13,6 +13,7 @@ from ophyd_devices.utils import bec_utils from bec_lib.core import BECMessage, MessageEndpoints from bec_lib.core.file_utils import FileWriterMixin +from collections import defaultdict from bec_lib.core import bec_logger @@ -90,14 +91,22 @@ class SIS38XX(Device): max_channels = Cpt(EpicsSignalRO, "MaxChannels") -class McsCsaxs(SIS38XX): - scaler = Cpt(ScalerCH, "scaler1") +num_lines = Cpt( + bec_utils.ConfigSignal, + name="num_lines", + kind="config", + config_storage_name="mcs_configs", +) - mca1 = Cpt(EpicsMCARecord, "mca1") - mca2 = Cpt(EpicsMCARecord, "mca2") - mca3 = Cpt(EpicsMCARecord, "mca3") - mca4 = Cpt(EpicsMCARecord, "mca4") - mca5 = Cpt(EpicsMCARecord, "mca5") + +class McsCsaxs(SIS38XX): + # scaler = Cpt(ScalerCH, "scaler1") + + # mca2 = Cpt(EpicsMCARecord, "mca2") + mca1 = Cpt(EpicsSignalRO, "mca1.VAL", auto_monitor=True) + mca3 = Cpt(EpicsSignalRO, "mca3.VAL", auto_monitor=True) + mca4 = Cpt(EpicsSignalRO, "mca4.VAL", auto_monitor=True) + # mca5 = Cpt(EpicsMCARecord, "mca5") # mca6 = Cpt(EpicsMCARecord, "mca6") # mca7 = Cpt(EpicsMCARecord, "mca7") # mca8 = Cpt(EpicsMCARecord, "mca8") @@ -139,6 +148,10 @@ class McsCsaxs(SIS38XX): sim_mode=False, **kwargs, ): + self.mcs_configs = { + f"{name}_num_lines": 1, + } + super().__init__( prefix=prefix, name=name, @@ -148,23 +161,29 @@ class McsCsaxs(SIS38XX): parent=parent, **kwargs, ) + if device_manager is None and not sim_mode: raise MCSError("Add DeviceManager to initialization or init with sim_mode=True") self.name = name + self._stream_ttl = 1800 self.wait_for_connection() # Make sure to be connected before talking to PVs + if not sim_mode: self.device_manager = device_manager self._producer = self.device_manager.producer else: self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() + # TODO mack mock connector class + # self._consumer = self.device_manager.connector.consumer self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) # TODO self.scaninfo.username = "e21206" self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} self.filewriter = FileWriterMixin(self.service_cfg) self._stopped = False + self._acquisition_done = False self._init_mcs() def _init_mcs(self) -> None: @@ -183,14 +202,50 @@ class McsCsaxs(SIS38XX): self._set_trigger(TriggerSource.MODE3) self.input_polarity.set(0) self.count_on_start.set(0) + self.mca_names = [signal for signal in self.signal_names if signal.startswith("mca")] + for mca in self.mca_names: + signal = getattr(self, mca) + signal.subscribe(self._on_mca_data) + self.mca_data = defaultdict(lambda: []) + self._counter = 0 + + def _on_mca_data(self, *args, obj=None, value=None, **kwargs) -> None: + self.mca_data[obj.attr_name] = value + if len(self.mca_names) == len(self.mca_data) and len(self.mca_data[self.mca_names]) != 0: + self._updated = True + self.erase_start.set(1) + self._send_data_to_bec() + self.mca_data = defaultdict(lambda: []) + self.counter += 1 + if self.counter == self.num_lines: + self._acquisition_done = True + + def _send_data_to_bec(self) -> None: + metadata = self.scaninfo.scan_msg.metadata + metadata.update( + { + "async_update": "append", + "num_lines": self.num_lines, + } + ) + msg = BECMessage.DeviceMessage( + signals=dict(self.mca_data), metadata=self.scaninfo.scan_msg.metadata + ).dumps() + self._producer.xadd( + topics=MessageEndpoints._device_async_readback(self.name), + msg=msg, + expire=self._stream_ttl, + ) def _prep_det(self) -> None: self._set_acquisition_params() self._set_trigger(TriggerSource.MODE3) def _set_acquisition_params(self) -> None: - # max number of readings is limited to 10000, but device can be reseted.. needs to be included on scan level - self.num_use_all.set(self.scaninfo.num_frames) + n_points = self.scaninfo.num_frames / int(self.num_lines.get()) + if n_points > 10000: + raise MCSError(f"Requested number of points {n_points} exceeds hardware limit of 10000") + self.num_use_all.set(n_points) self.preset_real.set(0) def _set_trigger(self, trigger_source: TriggerSource) -> None: @@ -204,24 +259,35 @@ class McsCsaxs(SIS38XX): Check ReadoutMode class for more information about options """ # self.read_mode.set(ReadoutMode.EVENT) - self.read_mode.set(ReadoutMode.PASSIVE) + self.erase_all.set(1) + self.read_mode.set(ReadoutMode.EVENT) - def _read_mcs_card(self) -> None: - # TODO how to properly trigger the readout!!! + def _force_readout_mcs_card(self) -> None: self.read_all.put(1, use_complete=False) - def readout_data(self) -> List: - self._read_mcs_card() - readback = [] - for ii in range(1, int(self.mux_output.read()[self.mux_output.name]["value"]) + 1): - readback.append(self._readout_mca_channels(ii)) - return readback + # TODO does not work anymore with new mca signals + # def readout_data(self) -> List[List]: + # """Manual readout of mca slots, returns list of lists""" + # self._force_readout_mcs_card() + # readback = [] + # for ii in range(1, int(self.mux_output.read()[self.mux_output.name]["value"]) + 1): + # readback.append(self._readout_mca_channels(ii)) + # return readback - def _readout_mca_channels(self, num: int) -> List[List]: - signal = f"mca{num}" - if signal in self.component_names: - readback = f"{getattr(self, signal).name}_spectrum" - return getattr(self, signal).read()[readback]["value"] + # def _readout_mca_channels(self, num: int) -> List: + # """readout of single mca channel""" + # signal = f"mca{num}" + # if signal in self.component_names: + # readback = f"{getattr(self, signal).name}_spectrum" + # return getattr(self, signal).read()[readback]["value"] + + def _start_readout_loop(self) -> None: + self._readout_lines() + # stop acquisition and clean up data + self.stop_all.set(1) + self.erase_all.set(1) + self._acquisition_done = True + self._updated = False def stage(self) -> List[object]: """stage the detector and file writer""" @@ -248,15 +314,14 @@ class McsCsaxs(SIS38XX): def unstage(self) -> List[object]: """unstage""" logger.info("Waiting for mcs to finish acquisition") - while True: - det_ctrl = self.acquiring.read()[self.acquiring.name]["value"] - if det_ctrl == 0: - break + # start readout in thread? + self._start_readout_loop() + while not self._acquisition_done: + # monitor signal instead? if self._stopped: break time.sleep(0.005) - if not self._stopped: - self._read_mcs_card() + # Message to BEC # state = True @@ -265,7 +330,10 @@ class McsCsaxs(SIS38XX): # MessageEndpoints.public_file(self.metadata["scanID"], self.name), # msg.dumps(), # ) + + self._acquisition_done = False self._stopped = False + logger.info("mcs done") return super().unstage() def arm_acquisition(self) -> None: @@ -274,6 +342,7 @@ class McsCsaxs(SIS38XX): Start: start_all Erase/Start: erase_start """ + self.counter = 0 self.erase_start.set(1) # self.start_all.set(1) @@ -283,9 +352,8 @@ class McsCsaxs(SIS38XX): """ self.stop_all.set(1) # self.erase_all.set(1) - # self.unstage() - super().stop(success=success) self._stopped = True + super().stop(success=success) # Automatically connect to test environmenr if directly invoked