WIP
This commit is contained in:
@@ -126,6 +126,7 @@ gfcam:
|
||||
prefix: 'X02DA-CAM-GF2:'
|
||||
backend_url: 'http://sls-daq-001:8080'
|
||||
auto_soft_enable: true
|
||||
std_daq_live: 'tcp://129.129.95.111:20000'
|
||||
deviceTags:
|
||||
- camera
|
||||
- trigger
|
||||
@@ -146,13 +147,13 @@ gfdaq:
|
||||
deviceTags:
|
||||
- std-daq
|
||||
- gfcam
|
||||
enabled: true
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
|
||||
daq_stream0:
|
||||
gf_stream0:
|
||||
description: stdDAQ preview (2 every 555)
|
||||
deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
|
||||
deviceConfig:
|
||||
@@ -160,56 +161,41 @@ daq_stream0:
|
||||
deviceTags:
|
||||
- std-daq
|
||||
- gfcam
|
||||
enabled: true
|
||||
enabled: false
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
|
||||
daq_stream1:
|
||||
description: stdDAQ preview (1 at 5 Hz)
|
||||
deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
|
||||
deviceConfig:
|
||||
url: 'tcp://129.129.95.111:20001'
|
||||
deviceTags:
|
||||
- std-daq
|
||||
- gfcam
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
# pcocam:
|
||||
# description: PCO.edge camera client
|
||||
# deviceClass: tomcat_bec.devices.PcoEdge5M
|
||||
# deviceConfig:
|
||||
# prefix: 'X02DA-CCDCAM2:'
|
||||
# deviceTags:
|
||||
# - camera
|
||||
# - trigger
|
||||
# - pcocam
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: monitored
|
||||
# softwareTrigger: true
|
||||
|
||||
|
||||
pcocam:
|
||||
description: PCO.edge camera client
|
||||
deviceClass: tomcat_bec.devices.PcoEdge5M
|
||||
deviceConfig:
|
||||
prefix: 'X02DA-CCDCAM2:'
|
||||
deviceTags:
|
||||
- camera
|
||||
- trigger
|
||||
- pcocam
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: true
|
||||
|
||||
pcodaq:
|
||||
description: GigaFrost stdDAQ client
|
||||
deviceClass: tomcat_bec.devices.StdDaqClient
|
||||
deviceConfig:
|
||||
ws_url: 'ws://129.129.95.111:8081'
|
||||
rest_url: 'http://129.129.95.111:5010'
|
||||
deviceTags:
|
||||
- std-daq
|
||||
- pcocam
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
# pcodaq:
|
||||
# description: GigaFrost stdDAQ client
|
||||
# deviceClass: tomcat_bec.devices.StdDaqClient
|
||||
# deviceConfig:
|
||||
# ws_url: 'ws://129.129.95.111:8081'
|
||||
# rest_url: 'http://129.129.95.111:5010'
|
||||
# deviceTags:
|
||||
# - std-daq
|
||||
# - pcocam
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: monitored
|
||||
# softwareTrigger: false
|
||||
|
||||
pco_stream0:
|
||||
description: stdDAQ preview (2 every 555)
|
||||
|
||||
@@ -59,6 +59,7 @@ class GigaFrostBase(Device):
|
||||
file_path = Cpt(Signal, kind=Kind.config, value="")
|
||||
file_prefix = Cpt(Signal, kind=Kind.config, value="")
|
||||
num_images = Cpt(Signal, kind=Kind.config, value=1)
|
||||
num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0)
|
||||
|
||||
# GF specific interface
|
||||
acquire_block = Cpt(Signal, kind=Kind.config, value=0)
|
||||
|
||||
@@ -484,8 +484,9 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
|
||||
self.backend.shutdown()
|
||||
super().destroy()
|
||||
|
||||
# def _on_preview_update(self, img:np.ndarray):
|
||||
# self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
|
||||
def _on_preview_update(self, img:np.ndarray, header: dict):
|
||||
self.num_images_counter.put(header['frame'], force=True)
|
||||
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
|
||||
|
||||
# def acq_done(self) -> DeviceStatus:
|
||||
# """
|
||||
|
||||
@@ -13,13 +13,15 @@ ZMQ_TOPIC_FILTER = b""
|
||||
|
||||
|
||||
class StdDaqPreview:
|
||||
USER_ACCESS = ["start", "stop"]
|
||||
USER_ACCESS = ["start", "stop", "image"]
|
||||
_socket = None
|
||||
_zmq_thread = None
|
||||
_shutdown_event = threading.Event()
|
||||
_throttle = 0.2
|
||||
image = None
|
||||
|
||||
def __init__(self, url: str, cb: Callable):
|
||||
self.url = url
|
||||
self._socket = None
|
||||
self._shutdown_event = threading.Event()
|
||||
self._zmq_thread = None
|
||||
self._on_update_callback = cb
|
||||
|
||||
def connect(self):
|
||||
@@ -40,6 +42,11 @@ class StdDaqPreview:
|
||||
self._socket.connect(self.url)
|
||||
|
||||
def start(self):
|
||||
# Only one consumer thread
|
||||
if self._zmq_thread:
|
||||
self.stop()
|
||||
|
||||
self._shutdown_event.clear()
|
||||
self._zmq_thread = threading.Thread(
|
||||
target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview"
|
||||
)
|
||||
@@ -49,13 +56,23 @@ class StdDaqPreview:
|
||||
self._shutdown_event.set()
|
||||
if self._zmq_thread:
|
||||
self._zmq_thread.join()
|
||||
self._zmq_thread = None
|
||||
|
||||
def _zmq_update_loop(self):
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
|
||||
t_last = time.time()
|
||||
while not self._shutdown_event.is_set():
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
try:
|
||||
self._poll()
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
|
||||
# Throttle parsing and callbacks
|
||||
t_curr = time.time()
|
||||
if t_curr - t_last > self._throttle:
|
||||
self._parse_data(r)
|
||||
t_last = t_curr
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
@@ -63,33 +80,10 @@ class StdDaqPreview:
|
||||
# Happens when receive queue is empty
|
||||
time.sleep(0.1)
|
||||
|
||||
def _poll(self):
|
||||
"""
|
||||
Poll the ZMQ socket for new data. It will throttle the data update and
|
||||
only subscribe to the topic for a single update. This is not very nice
|
||||
but it seems like there is currently no option to set the update rate on
|
||||
the backend.
|
||||
"""
|
||||
|
||||
if self._shutdown_event.wait(0.2):
|
||||
return
|
||||
|
||||
try:
|
||||
# subscribe to the topic
|
||||
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
self._parse_data(r)
|
||||
|
||||
finally:
|
||||
# Unsubscribe from the topic
|
||||
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
def _parse_data(self, data):
|
||||
# Length and throtling checks
|
||||
if len(data) != 2:
|
||||
logger.warning(f"Received malformed array of length {len(data)}")
|
||||
logger.warning(f"Received incomplete ZMQ message of length {len(data)}")
|
||||
|
||||
# Unpack the Array V1 reply to metadata and array data
|
||||
meta, img_data = data
|
||||
@@ -98,11 +92,17 @@ class StdDaqPreview:
|
||||
header = json.loads(meta)
|
||||
if header["type"] == "uint16":
|
||||
image = np.frombuffer(img_data, dtype=np.uint16)
|
||||
elif header["type"] == "uint8":
|
||||
image = np.frombuffer(img_data, dtype=np.uint8)
|
||||
else:
|
||||
raise ValueError(f"Unexpected type {header['type']}")
|
||||
if image.size != np.prod(header["shape"]):
|
||||
err = f"Unexpected array size of {image.size} for header: {header}"
|
||||
raise ValueError(err)
|
||||
image = image.reshape(header["shape"])
|
||||
logger.info(f"Live update: frame {header['frame']}")
|
||||
self._on_update_callback(image)
|
||||
# Print diadnostics and run callback
|
||||
logger.info(
|
||||
f"Live update: frame {header['frame']}\tShape: {header['shape']}\t"
|
||||
f"Mean: {np.mean(image):.3f}"
|
||||
)
|
||||
self._on_update_callback(image, header)
|
||||
|
||||
Reference in New Issue
Block a user