ophyd_devices/ophyd_devices/epics/devices/bec_scaninfo_mixin.py

56 lines
2.0 KiB
Python

import os
from bec_lib.core import DeviceManagerBase, BECMessage, MessageEndpoints
class BecScaninfoMixin:
def __init__(self, device_manager: DeviceManagerBase = None, sim_mode=False) -> None:
self.device_manager = device_manager
self.sim_mode = sim_mode
self.bec_info_msg = {
"RID": "mockrid",
"queueID": "mockqueuid",
"scan_number": 1,
"exp_time": 26e-3,
"num_points": 10000,
"readout_time": 2e-3,
"scan_type": "fly",
}
def get_bec_info_msg(self) -> None:
return self.bec_info_msg
def change_config(self, bec_info_msg: dict) -> None:
self.bec_info_msg = bec_info_msg
def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage:
if not self.sim_mode:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
return BECMessage.ScanStatusMessage.loads(msg)
return BECMessage.ScanStatusMessage(
scanID="1",
status={},
info=self.bec_info_msg,
)
def _get_username(self) -> str:
if not self.sim_mode:
return self.device_manager.producer.get(MessageEndpoints.account()).decode()
return os.getlogin()
def load_scan_metadata(self) -> None:
scan_msg = self._get_current_scan_msg()
self.metadata = {
"scanID": scan_msg.content["scanID"],
"RID": scan_msg.content["info"]["RID"],
"queueID": scan_msg.content["info"]["queueID"],
}
self.scanID = scan_msg.content["scanID"]
self.scan_number = scan_msg.content["info"]["scan_number"]
self.exp_time = scan_msg.content["info"]["exp_time"]
self.num_frames = scan_msg.content["info"]["num_points"]
self.scan_type = scan_msg.content["info"].get("scan_type", "step")
self.readout_time = scan_msg.content["info"]["readout_time"]
self.username = self._get_username()