363 lines
12 KiB
Python

import enum
import time
from typing import Any, List
import numpy as np
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd import EpicsSignal, EpicsSignalRO, Component as Cpt, Device
from ophyd.mca import EpicsMCARecord
from ophyd.scaler import ScalerCH
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
from bec_lib.core import BECMessage, MessageEndpoints
from bec_lib.core.file_utils import FileWriterMixin
from collections import defaultdict
from bec_lib.core import bec_logger
logger = bec_logger.logger
class MCSError(Exception):
pass
class TriggerSource(int, enum.Enum):
MODE0 = 0
MODE1 = 1
MODE2 = 2
MODE3 = 3
MODE4 = 4
MODE5 = 5
MODE6 = 6
class ChannelAdvance(int, enum.Enum):
INTERNAL = 0
EXTERNAL = 1
class ReadoutMode(int, enum.Enum):
PASSIVE = 0
EVENT = 1
IO_INTR = 2
FREQ_0_1HZ = 3
FREQ_0_2HZ = 4
FREQ_0_5HZ = 5
FREQ_1HZ = 6
FREQ_2HZ = 7
FREQ_5HZ = 8
FREQ_10HZ = 9
FREQ_100HZ = 10
class SIS38XX(Device):
"""SIS38XX control"""
# Acquisition
erase_all = Cpt(EpicsSignal, "EraseAll")
erase_start = Cpt(EpicsSignal, "EraseStart", trigger_value=1)
start_all = Cpt(EpicsSignal, "StartAll")
stop_all = Cpt(EpicsSignal, "StopAll")
acquiring = Cpt(EpicsSignal, "Acquiring")
preset_real = Cpt(EpicsSignal, "PresetReal")
elapsed_real = Cpt(EpicsSignal, "ElapsedReal")
read_mode = Cpt(EpicsSignal, "ReadAll.SCAN")
read_all = Cpt(EpicsSignal, "DoReadAll.VAL", trigger_value=1)
num_use_all = Cpt(EpicsSignal, "NuseAll")
current_channel = Cpt(EpicsSignal, "CurrentChannel")
dwell = Cpt(EpicsSignal, "Dwell")
channel_advance = Cpt(EpicsSignal, "ChannelAdvance")
count_on_start = Cpt(EpicsSignal, "CountOnStart")
software_channel_advance = Cpt(EpicsSignal, "SoftwareChannelAdvance")
channel1_source = Cpt(EpicsSignal, "Channel1Source")
prescale = Cpt(EpicsSignal, "Prescale")
enable_client_wait = Cpt(EpicsSignal, "EnableClientWait")
client_wait = Cpt(EpicsSignal, "ClientWait")
acquire_mode = Cpt(EpicsSignal, "AcquireMode")
mux_output = Cpt(EpicsSignal, "MUXOutput")
user_led = Cpt(EpicsSignal, "UserLED")
input_mode = Cpt(EpicsSignal, "InputMode")
input_polarity = Cpt(EpicsSignal, "InputPolarity")
output_mode = Cpt(EpicsSignal, "OutputMode")
output_polarity = Cpt(EpicsSignal, "OutputPolarity")
model = Cpt(EpicsSignalRO, "Model", string=True)
firmware = Cpt(EpicsSignalRO, "Firmware")
max_channels = Cpt(EpicsSignalRO, "MaxChannels")
num_lines = Cpt(
bec_utils.ConfigSignal,
name="num_lines",
kind="config",
config_storage_name="mcs_configs",
)
class McsCsaxs(SIS38XX):
# scaler = Cpt(ScalerCH, "scaler1")
# mca2 = Cpt(EpicsMCARecord, "mca2")
mca1 = Cpt(EpicsSignalRO, "mca1.VAL", auto_monitor=True)
mca3 = Cpt(EpicsSignalRO, "mca3.VAL", auto_monitor=True)
mca4 = Cpt(EpicsSignalRO, "mca4.VAL", auto_monitor=True)
# mca5 = Cpt(EpicsMCARecord, "mca5")
# mca6 = Cpt(EpicsMCARecord, "mca6")
# mca7 = Cpt(EpicsMCARecord, "mca7")
# mca8 = Cpt(EpicsMCARecord, "mca8")
# mca9 = Cpt(EpicsMCARecord, "mca9")
# mca10 = Cpt(EpicsMCARecord, "mca10")
# mca11 = Cpt(EpicsMCARecord, "mca11")
# mca12 = Cpt(EpicsMCARecord, "mca12")
# mca13 = Cpt(EpicsMCARecord, "mca13")
# mca14 = Cpt(EpicsMCARecord, "mca14")
# mca15 = Cpt(EpicsMCARecord, "mca15")
# mca16 = Cpt(EpicsMCARecord, "mca16")
# mca17 = Cpt(EpicsMCARecord, "mca17")
# mca18 = Cpt(EpicsMCARecord, "mca18")
# mca19 = Cpt(EpicsMCARecord, "mca19")
# mca20 = Cpt(EpicsMCARecord, "mca20")
# mca21 = Cpt(EpicsMCARecord, "mca21")
# mca22 = Cpt(EpicsMCARecord, "mca22")
# mca23 = Cpt(EpicsMCARecord, "mca23")
# mca24 = Cpt(EpicsMCARecord, "mca24")
# mca25 = Cpt(EpicsMCARecord, "mca25")
# mca26 = Cpt(EpicsMCARecord, "mca26")
# mca27 = Cpt(EpicsMCARecord, "mca27")
# mca28 = Cpt(EpicsMCARecord, "mca28")
# mca29 = Cpt(EpicsMCARecord, "mca29")
# mca30 = Cpt(EpicsMCARecord, "mca30")
# mca31 = Cpt(EpicsMCARecord, "mca31")
# mca32 = Cpt(EpicsMCARecord, "mca32")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
self.mcs_configs = {
f"{name}_num_lines": 1,
}
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise MCSError("Add DeviceManager to initialization or init with sim_mode=True")
self.name = name
self._stream_ttl = 1800
self.wait_for_connection() # Make sure to be connected before talking to PVs
if not sim_mode:
self.device_manager = device_manager
self._producer = self.device_manager.producer
else:
self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager()
# TODO mack mock connector class
# self._consumer = self.device_manager.connector.consumer
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
# TODO
self.scaninfo.username = "e21206"
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"}
self.filewriter = FileWriterMixin(self.service_cfg)
self._stopped = False
self._acquisition_done = False
self._init_mcs()
def _init_mcs(self) -> None:
"""Init parameters for mcs card 9m
channel_advance: 0/1 -> internal / external
channel1_source: 0/1 -> int clock / external source
user_led: 0/1 -> off/on
max_output : num of channels 0...32, uncomment top for more than 5
input_mode: operation mode -> Mode 3 for external trigger, check manual for more info
input_polarity: triggered between falling and falling edge -> use inverted signal from ddg
"""
self.channel_advance.set(ChannelAdvance.EXTERNAL)
self.channel1_source.set(ChannelAdvance.INTERNAL)
self.user_led.set(0)
self.mux_output.set(5)
self._set_trigger(TriggerSource.MODE3)
self.input_polarity.set(0)
self.count_on_start.set(0)
self.mca_names = [signal for signal in self.signal_names if signal.startswith("mca")]
for mca in self.mca_names:
signal = getattr(self, mca)
signal.subscribe(self._on_mca_data)
self.mca_data = defaultdict(lambda: [])
self._counter = 0
def _on_mca_data(self, *args, obj=None, value=None, **kwargs) -> None:
self.mca_data[obj.attr_name] = value
if len(self.mca_names) == len(self.mca_data) and len(self.mca_data[self.mca_names]) != 0:
self._updated = True
self.erase_start.set(1)
self._send_data_to_bec()
self.mca_data = defaultdict(lambda: [])
self.counter += 1
if self.counter == self.num_lines:
self._acquisition_done = True
def _send_data_to_bec(self) -> None:
metadata = self.scaninfo.scan_msg.metadata
metadata.update(
{
"async_update": "append",
"num_lines": self.num_lines,
}
)
msg = BECMessage.DeviceMessage(
signals=dict(self.mca_data), metadata=self.scaninfo.scan_msg.metadata
).dumps()
self._producer.xadd(
topics=MessageEndpoints._device_async_readback(self.name),
msg=msg,
expire=self._stream_ttl,
)
def _prep_det(self) -> None:
self._set_acquisition_params()
self._set_trigger(TriggerSource.MODE3)
def _set_acquisition_params(self) -> None:
n_points = self.scaninfo.num_frames / int(self.num_lines.get())
if n_points > 10000:
raise MCSError(f"Requested number of points {n_points} exceeds hardware limit of 10000")
self.num_use_all.set(n_points)
self.preset_real.set(0)
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""7 Modes, see TriggerSource
Mode3 for cSAXS"""
value = int(trigger_source)
self.input_mode.set(value)
def _prep_readout(self) -> None:
"""Set readout mode of mcs card
Check ReadoutMode class for more information about options
"""
# self.read_mode.set(ReadoutMode.EVENT)
self.erase_all.set(1)
self.read_mode.set(ReadoutMode.EVENT)
def _force_readout_mcs_card(self) -> None:
self.read_all.put(1, use_complete=False)
# TODO does not work anymore with new mca signals
# def readout_data(self) -> List[List]:
# """Manual readout of mca slots, returns list of lists"""
# self._force_readout_mcs_card()
# readback = []
# for ii in range(1, int(self.mux_output.read()[self.mux_output.name]["value"]) + 1):
# readback.append(self._readout_mca_channels(ii))
# return readback
# def _readout_mca_channels(self, num: int) -> List:
# """readout of single mca channel"""
# signal = f"mca{num}"
# if signal in self.component_names:
# readback = f"{getattr(self, signal).name}_spectrum"
# return getattr(self, signal).read()[readback]["value"]
def _start_readout_loop(self) -> None:
self._readout_lines()
# stop acquisition and clean up data
self.stop_all.set(1)
self.erase_all.set(1)
self._acquisition_done = True
self._updated = False
def stage(self) -> List[object]:
"""stage the detector and file writer"""
self.scaninfo.load_scan_metadata()
self._prep_det()
self._prep_readout()
# msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
# self._producer.set_and_publish(
# MessageEndpoints.public_file(self.scaninfo.scanID, "mcs_csaxs"),
# msg.dumps(),
# )
self.arm_acquisition()
logger.info("Waiting for mcs to be armed")
while True:
det_ctrl = self.acquiring.read()[self.acquiring.name]["value"]
if det_ctrl == 1:
break
time.sleep(0.005)
logger.info("mcs is ready and running")
return super().stage()
def unstage(self) -> List[object]:
"""unstage"""
logger.info("Waiting for mcs to finish acquisition")
# start readout in thread?
self._start_readout_loop()
while not self._acquisition_done:
# monitor signal instead?
if self._stopped:
break
time.sleep(0.005)
# Message to BEC
# state = True
# msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state)
# self._producer.set_and_publish(
# MessageEndpoints.public_file(self.metadata["scanID"], self.name),
# msg.dumps(),
# )
self._acquisition_done = False
self._stopped = False
logger.info("mcs done")
return super().unstage()
def arm_acquisition(self) -> None:
"""Arm acquisition
Options:
Start: start_all
Erase/Start: erase_start
"""
self.counter = 0
self.erase_start.set(1)
# self.start_all.set(1)
def stop(self, *, success=False) -> None:
"""Stop acquisition
Stop or Stop and Erase
"""
self.stop_all.set(1)
# self.erase_all.set(1)
self._stopped = True
super().stop(success=success)
# Automatically connect to test environmenr if directly invoked
if __name__ == "__main__":
mcs = McsCsaxs(name="mcs", prefix="X12SA-MCS:", sim_mode=True)
mcs.stage()