Code cleanup, needs checking

This commit is contained in:
gac-x05la
2024-08-30 17:09:48 +02:00
committed by mohacsi_i
parent 0c8c60cfe3
commit fc0e426850
8 changed files with 116 additions and 239 deletions
@@ -1,7 +1,7 @@
eyex:
readoutPriority: baseline
description: X-ray eye axis X
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorMR
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
deviceConfig:
prefix: MTEST-X05LA-ES2-XRAYEYE:M1
deviceTags:
@@ -13,7 +13,7 @@ eyex:
eyey:
readoutPriority: baseline
description: X-ray eye axis Y
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorMR
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
deviceConfig:
prefix: MTEST-X05LA-ES2-XRAYEYE:M2
deviceTags:
+1 -2
View File
@@ -8,8 +8,7 @@ from .aerotech.AerotechAutomation1 import (
aa1Tasks,
)
from .grashopper_tomcat import GrashopperTOMCAT
from .psimotor import EpicsMotorMR, EpicsMotorEC
from .gigafrost.gigafrostclient import GigaFrostClient
from .gigafrost.stddaq_preview import StdDaqPreview, StdDaqPreviewDetector
from .gigafrost.stddaq_preview import StdDaqPreviewDetector
+27 -21
View File
@@ -40,6 +40,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
detector class.
"""
def _define_backend_ip(self):
""" Select backend IP address for UDP stream"""
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
@@ -48,6 +49,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
raise RuntimeError(f"Backend {self.parent.backendUrl.get()} not recognized.")
def _define_backend_mac(self):
""" Select backend MAC address for UDP stream"""
if self.parent.backendUrl.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
if self.parent.backendUrl.get() == const.BE999_DAFL_CLIENT:
@@ -60,7 +62,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
self.parent.cfgConnectionParam.set(self._build_udp_header_table()).wait()
def _build_udp_header_table(self):
"""Build the header table for the communication"""
"""Build the header table for the UDP communication"""
udp_header_table = []
for i in range(0, 64, 1):
@@ -86,9 +88,15 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
return udp_header_table
def on_init(self) -> None:
"""Initialize the camera, set channel values"""
""" Initialize the camera, set channel values"""
# ToDo: Not sure if it's a good idea to change camera settings upon
# ophyd device startup, i.e. each deviceserver restart.
self._init_gigafrost()
self.parent._initialized = True
def _init_gigafrost(self) -> None:
""" Initialize the camera, set channel values"""
## Stop acquisition
self.parent.cmdStartCamera.set(0).wait()
@@ -132,7 +140,6 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
# Set udp header table
self._set_udp_header_table()
self.parent.state.put(const.GfStatus.INIT, force=True)
return super().on_init()
def on_stage(self) -> None:
@@ -149,9 +156,9 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
# Either an acquisition is running or it's already done
# Gigafrost can finish a run without explicit unstaging
if self.parent.infoBusyFlag.value:
logger.warn("Camera is already busy, unstage it first!")
logger.warn("Camera is already busy, unstaging it first!")
self.parent.unstage()
sleep(0.5)
# Sync if out of sync
@@ -159,9 +166,6 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
self.parent.cmdSyncHw.set(1).wait()
# Switch to acquiring
self.parent.cmdStartCamera.set(1).wait()
self.parent.state.put(const.GfStatus.ACQUIRING, force=True)
# Gigafrost can finish a run without explicit unstaging
self.parent._staged = Staged.no
def on_unstage(self) -> None:
"""Specify actions to be executed during unstage.
@@ -174,7 +178,6 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
self.parent.cmdStartCamera.set(0).wait()
if self.parent.autoSoftEnable.get():
self.parent.cmdSoftEnable.set(0).wait()
self.parent.state.put(const.GfStatus.STOPPED, force=True)
def on_stop(self) -> None:
"""
@@ -198,7 +201,6 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
# BEC teststand operation mode: posedge of SoftEnable if Started
self.parent.cmdSoftEnable.set(0).wait()
self.parent.cmdSoftEnable.set(1).wait()
else:
self.parent.cmdSoftTrigger.set(1).wait()
@@ -230,6 +232,7 @@ class GigaFrostCamera(PSIDetectorBase):
custom_prepare_cls = GigaFrostCameraMixin
USER_ACCESS = [""]
_initialized = False
infoBusyFlag = Component(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
infoSyncFlag = Component(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
@@ -390,7 +393,6 @@ class GigaFrostCamera(PSIDetectorBase):
macSouth = Component(Signal, kind=Kind.config)
ipNorth = Component(Signal, kind=Kind.config)
ipSouth = Component(Signal, kind=Kind.config)
state = Component(Signal, value=int(const.GfStatus.NEW), kind=Kind.config)
def __init__(
self,
@@ -407,25 +409,19 @@ class GigaFrostCamera(PSIDetectorBase):
self._signals_to_be_set['backend_url'] = backend_url
# super() will call the mixin class
super().__init__(
prefix=prefix,
name=name,
**kwargs,
)
super().__init__(prefix=prefix, name=name, **kwargs)
def _init(self):
"""Ugly hack: values must be set before on_init() is called"""
# Additional parameters
self.autoSoftEnable._metadata["write_access"] = False
self.backendUrl._metadata["write_access"] = False
self.state._metadata["write_access"] = False
self.autoSoftEnable.put(self._signals_to_be_set['auto_soft_enable'], force=True)
self.backendUrl.put(self._signals_to_be_set['backend_url'], force=True)
self.state.put(const.GfStatus.NEW, force=True)
return super()._init()
def trigger(self) -> DeviceStatus:
""" Sends a software trigger to GigaFrost"""
super().trigger()
# There's no status readback from the camera, so we just wait
@@ -469,6 +465,8 @@ class GigaFrostCamera(PSIDetectorBase):
"""
# Stop acquisition
self.unstage()
if not self._initialized:
pass
# If Bluesky style configure
if d is not None:
@@ -495,7 +493,15 @@ class GigaFrostCamera(PSIDetectorBase):
# Commit parameter
self.cmdSetParam.set(1).wait()
self.state.set(const.GfStatus.CONFIGURED, force=True)
def stage(self):
""" Standard stage command"""
if not self._initialized:
logger.warn(
"Ophyd device havent ran the initialization sequence,"
"IOC might be in unknown configuration."
)
return super().stage()
@property
def exposure_mode(self):
+19 -28
View File
@@ -7,8 +7,6 @@ Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
from ophyd import Component, DeviceStatus
from ophyd.device import Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
@@ -49,16 +47,16 @@ class GigaFrostClientMixin(CustomDetectorMixin):
#self.parent.cam.stage()
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
# def on_unstage(self) -> None:
# """
# Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
self.parent.cam.unstage()
self.parent.daq.unstage()
# This step should include checking if the acqusition was successful,
# and publishing the file location and file event message,
# with flagged done to BEC.
# """
# self.parent.cam.unstage()
# self.parent.daq.unstage()
def on_stop(self) -> None:
"""
@@ -132,13 +130,7 @@ class GigaFrostClient(PSIDetectorBase):
self.__class__.__dict__["daq"].kwargs['ws_url'] = daq_ws_url
self.__class__.__dict__["daq"].kwargs['rest_url'] = daq_rest_url
super().__init__(
prefix=prefix,
name=name,
kind=kind,
**kwargs,
)
super().__init__(prefix=prefix, name=name, kind=kind, **kwargs)
def configure(self, d: dict=None):
"""Configure the next scan with the GigaFRoST camera and standard DAQ backend.
@@ -148,11 +140,11 @@ class GigaFrostClient(PSIDetectorBase):
Parameters
----------
ntotal : int, optional
Total mumber of images to be taken during the whole scan. Set to -1
for an unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10000)
Total mumber of images to be taken by the DAQ during the whole scan.
Set to -1 for an unlimited number of images (limited by the
ringbuffer size and backend speed). (default = 10000)
nimages : int, optional
Number of images to be taken during each trigger (i.e. burst).
Number of images to be taken during each trigger (i.e. burst).
Maximum is 16777215 images. (default = 10)
exposure : float, optional
Exposure time, max 40 ms. [ms]. (default = 0.2)
@@ -183,7 +175,6 @@ class GigaFrostClient(PSIDetectorBase):
"""
px_daq_h = self.daq.config.cfg_pixel_height.get()
px_daq_w = self.daq.config.cfg_pixel_width.get()
px_gf_w = self.cam.cfgRoiX.get()
px_gf_h = self.cam.cfgRoiY.get()
@@ -192,11 +183,11 @@ class GigaFrostClient(PSIDetectorBase):
return super().stage()
def trigger(self) -> DeviceStatus:
""" Triggers the current device and all sub-devices, i.e. the camera.
"""
status = super().trigger()
return status
# def trigger(self) -> DeviceStatus:
# """ Triggers the current device and all sub-devices, i.e. the camera.
# """
# status = super().trigger()
# return status
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
+14
View File
@@ -2,6 +2,20 @@
The GigaFrost camera IOC is a form from an ancient version of Helge's cameras.
As we're commissioning, the current folder also contains the standard DAQ client.
The ophyd implementation tries to balance between familiarity with the old
**gfclient** pyepics library and the BEC/bluesky event model.
# Examples
A simple code example with soft triggering:
'''
d = {'ntotal':100000, 'nimages':3009, 'exposure':10.0, 'period':20.0, 'pixel_width':2016, 'pixel_height':2016}
gfc.configure(d)
gfc.stage()
for ii in range(10):
gfc.trigger()
gfc.unstage()
'''
# Opening GigaFrost panel
+29 -157
View File
@@ -30,137 +30,6 @@ class StdDaqPreviewState(enum.IntEnum):
MONITORING = 2
class StdDaqPreview(Device):
"""Wrapper class around the StdDaq preview image stream.
This was meant to provide live image preview directly from the StdDAQ.
Note that the preview stream must be heavily throtled in order to cope
with the incoming data.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# pylint: disable=too-many-instance-attributes
# Subscriptions for plotting image
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
# Status attributes
url = Component(Signal, kind=Kind.config)
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
image = Component(Signal, kind=Kind.normal)
frame = Component(Signal, kind=Kind.normal)
image_shape = Component(Signal, kind=Kind.omitted)
value = Component(Signal, kind=Kind.hinted)
throttle = Component(Signal, value=0.2, kind=Kind.omitted)
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url._metadata["write_access"] = False
self.status._metadata["write_access"] = False
self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False
self.value._metadata["write_access"] = False
self.url.set(url, force=True).wait()
self._stop_polling = False
self._mon = None
# Connect ro the DAQ
self.connect()
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url.get())
def stage(self) -> list:
"""Start listening for preview data stream"""
self.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
return super().stage()
def unstage(self):
"""Stop a running preview"""
self._stop_polling = True
return super().unstage()
def stop(self, *, success=False):
"""Stop a running preview"""
self.unstage()
def poll(self):
"""Collect streamed updates"""
self.status.set(StdDaqPreviewState.MONITORING, force=True)
t_last = time()
try:
while True:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.name}]\tDetaching monitor")
break
try:
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
if len(r)==2:
meta, data = r
else:
sleep(0.1)
continue
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed > self.throttle.get():
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# Update image and update subscribers
self.frame.put(header['frame'], force=True)
self.image_shape.put(header['shape'], force=True)
self.image.put(image, force=True)
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
t_last=t_curr
logger.info(
f"[{self.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.name}]\t{str(ex)}")
raise
finally:
self._mon = None
self.status.set(StdDaqPreviewState.DETACHED, force=True)
class StdDaqPreviewMixin(CustomDetectorMixin):
"""Setup class for the standard DAQ preview stream
@@ -195,8 +64,8 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
def poll(self):
"""Collect streamed updates"""
self.parent.status.set(StdDaqPreviewState.MONITORING, force=True)
t_last = time()
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
@@ -206,34 +75,37 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
# pylint: disable=no-member
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
if len(r)==2:
meta, data = r
else:
sleep(0.1)
# Length and throtling checks
if len(r)!=2:
continue
# Update image and update subscribers
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed > self.parent.throttle.get():
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
sleep(0.1)
continue
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last=t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last=t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
@@ -271,7 +143,7 @@ class StdDaqPreviewDetector(PSIDetectorBase):
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
image = Component(Signal, kind=Kind.normal)
frame = Component(Signal, kind=Kind.hinted)
image_shape = Component(Signal, kind=Kind.omitted)
image_shape = Component(Signal, kind=Kind.normal)
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
@@ -308,5 +180,5 @@ class StdDaqPreviewDetector(PSIDetectorBase):
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqPreview(url="tcp://129.129.95.38:20000", name="preview")
daq = StdDaqPreviewDetector(url="tcp://129.129.95.38:20000", name="preview")
daq.wait_for_connection()
+16 -21
View File
@@ -39,7 +39,6 @@ class StdDaqRestClient(Device):
# pylint: disable=too-many-instance-attributes
USER_ACCESS = ["write_daq_config"]
_config_read = False
# Status attributes
rest_url = Component(Signal, kind=Kind.config)
@@ -101,36 +100,32 @@ class StdDaqRestClient(Device):
self.cfg_module_sync_queue_size.set(cfg['module_sync_queue_size']).wait()
#self.cfg_module_positions.set(cfg['module_positions']).wait()
self._config_read = True
def _build_config(self, orig) -> dict:
def _build_config(self, orig=None) -> dict:
config = {
'detector_name': str(self.cfg_detector_name.get()),
'detector_type': str(self.cfg_detector_type.get()),
'n_modules': int(self.cfg_n_modules.get()),
'bit_depth': int(self.cfg_bit_depth.get()),
# 'detector_name': str(self.cfg_detector_name.get()),
# 'detector_type': str(self.cfg_detector_type.get()),
# 'n_modules': int(self.cfg_n_modules.get()),
# 'bit_depth': int(self.cfg_bit_depth.get()),
'image_pixel_height': int(self.cfg_pixel_height.get()),
'image_pixel_width': int(self.cfg_pixel_width.get()),
'start_udp_port': int(self.cfg_start_udp_port.get()),
'writer_user_id': int(self.cfg_writer_user_id.get()),
'log_level': "info",
'submodule_info': {},
'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders.get()),
'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()),
'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()),
'module_positions': {},
'number_of_writers': 14
# 'start_udp_port': int(self.cfg_start_udp_port.get()),
# 'writer_user_id': int(self.cfg_writer_user_id.get()),
# 'log_level': "info",
# 'submodule_info': {},
# 'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders.get()),
# 'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()),
# 'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()),
# 'module_positions': {},
# 'number_of_writers': 14
}
config = orig.update(config)
if orig is not None:
config = orig.update(config)
return config
def write_daq_config(self):
"""Write configuration ased on current PV values. Some fields might be
unchangeable.
"""
if not self._config_read:
raise RuntimeError("Pleae read config before editing")
orig = self.get_daq_config()
config = self._build_config(orig)
+8 -8
View File
@@ -45,6 +45,7 @@ class StdDaqClient(Device):
# pylint: disable=too-many-instance-attributes
# Status attributes
url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="unknown", kind=Kind.normal)
n_total = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
@@ -63,6 +64,8 @@ class StdDaqClient(Device):
super().__init__(*args, parent=parent, **kwargs)
self.status._metadata["write_access"] = False
self.url._metadata["write_access"] = False
self.url.set(ws_url).wait()
self._ws_url = ws_url
self._mon = None
@@ -71,10 +74,10 @@ class StdDaqClient(Device):
self.connect()
def connect(self):
"""Connect to te StdDAQ's websockets interface
"""Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart,
so if it fails, wait a bit and try to connect again.
StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again.
"""
num_retry = 0
while num_retry < 5:
@@ -88,10 +91,6 @@ class StdDaqClient(Device):
raise ConnectionRefusedError(
"The stdDAQ websocket interface refused connection 5 times.")
def __del__(self):
"""Try to close the socket"""
self._client.close_socket()
def monitor(self):
"""Attach monitoring to the DAQ"""
self._client = connect(self._ws_url)
@@ -190,7 +189,7 @@ class StdDaqClient(Device):
def message(self, message: dict, timeout=1, wait_reply=True):
"""Send a message to the StdDAQ and receive a reply
Note: finishing acquisition meang StdDAQ will close connections so
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
"""
if isinstance(message, dict):
@@ -204,6 +203,7 @@ class StdDaqClient(Device):
except (ConnectionClosedError, ConnectionClosedOK):
self.connect()
self._client.send(msg)
# Wait for reply
reply = None
if wait_reply: