diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 4606e0f..67d74e3 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -126,6 +126,7 @@ gfcam: prefix: 'X02DA-CAM-GF2:' backend_url: 'http://sls-daq-001:8080' auto_soft_enable: true + std_daq_live: 'tcp://129.129.95.111:20000' deviceTags: - camera - trigger @@ -146,13 +147,13 @@ gfdaq: deviceTags: - std-daq - gfcam - enabled: true + enabled: false onFailure: buffer readOnly: false readoutPriority: monitored softwareTrigger: false -daq_stream0: +gf_stream0: description: stdDAQ preview (2 every 555) deviceClass: tomcat_bec.devices.StdDaqPreviewDetector deviceConfig: @@ -160,56 +161,41 @@ daq_stream0: deviceTags: - std-daq - gfcam - enabled: true + enabled: false onFailure: buffer readOnly: false readoutPriority: monitored softwareTrigger: false -daq_stream1: - description: stdDAQ preview (1 at 5 Hz) - deviceClass: tomcat_bec.devices.StdDaqPreviewDetector - deviceConfig: - url: 'tcp://129.129.95.111:20001' - deviceTags: - - std-daq - - gfcam - enabled: true - onFailure: buffer - readOnly: false - readoutPriority: monitored - softwareTrigger: false +# pcocam: +# description: PCO.edge camera client +# deviceClass: tomcat_bec.devices.PcoEdge5M +# deviceConfig: +# prefix: 'X02DA-CCDCAM2:' +# deviceTags: +# - camera +# - trigger +# - pcocam +# enabled: true +# onFailure: buffer +# readOnly: false +# readoutPriority: monitored +# softwareTrigger: true - -pcocam: - description: PCO.edge camera client - deviceClass: tomcat_bec.devices.PcoEdge5M - deviceConfig: - prefix: 'X02DA-CCDCAM2:' - deviceTags: - - camera - - trigger - - pcocam - enabled: true - onFailure: buffer - readOnly: false - readoutPriority: monitored - softwareTrigger: true - -pcodaq: - description: GigaFrost stdDAQ client - deviceClass: tomcat_bec.devices.StdDaqClient - deviceConfig: - ws_url: 'ws://129.129.95.111:8081' - rest_url: 'http://129.129.95.111:5010' - deviceTags: - - std-daq - - pcocam - enabled: true - onFailure: buffer - readOnly: false - readoutPriority: monitored - softwareTrigger: false +# pcodaq: +# description: GigaFrost stdDAQ client +# deviceClass: tomcat_bec.devices.StdDaqClient +# deviceConfig: +# ws_url: 'ws://129.129.95.111:8081' +# rest_url: 'http://129.129.95.111:5010' +# deviceTags: +# - std-daq +# - pcocam +# enabled: true +# onFailure: buffer +# readOnly: false +# readoutPriority: monitored +# softwareTrigger: false pco_stream0: description: stdDAQ preview (2 every 555) diff --git a/tomcat_bec/devices/gigafrost/gigafrost_base.py b/tomcat_bec/devices/gigafrost/gigafrost_base.py index 232af1c..3bc105f 100644 --- a/tomcat_bec/devices/gigafrost/gigafrost_base.py +++ b/tomcat_bec/devices/gigafrost/gigafrost_base.py @@ -59,6 +59,7 @@ class GigaFrostBase(Device): file_path = Cpt(Signal, kind=Kind.config, value="") file_prefix = Cpt(Signal, kind=Kind.config, value="") num_images = Cpt(Signal, kind=Kind.config, value=1) + num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0) # GF specific interface acquire_block = Cpt(Signal, kind=Kind.config, value=0) diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 9f76cc5..29b7363 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -484,8 +484,9 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): self.backend.shutdown() super().destroy() - # def _on_preview_update(self, img:np.ndarray): - # self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) + def _on_preview_update(self, img:np.ndarray, header: dict): + self.num_images_counter.put(header['frame'], force=True) + self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img) # def acq_done(self) -> DeviceStatus: # """ diff --git a/tomcat_bec/devices/gigafrost/std_daq_preview.py b/tomcat_bec/devices/gigafrost/std_daq_preview.py index fdd3d34..c519785 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_preview.py +++ b/tomcat_bec/devices/gigafrost/std_daq_preview.py @@ -13,13 +13,15 @@ ZMQ_TOPIC_FILTER = b"" class StdDaqPreview: - USER_ACCESS = ["start", "stop"] + USER_ACCESS = ["start", "stop", "image"] + _socket = None + _zmq_thread = None + _shutdown_event = threading.Event() + _throttle = 0.2 + image = None def __init__(self, url: str, cb: Callable): self.url = url - self._socket = None - self._shutdown_event = threading.Event() - self._zmq_thread = None self._on_update_callback = cb def connect(self): @@ -40,6 +42,11 @@ class StdDaqPreview: self._socket.connect(self.url) def start(self): + # Only one consumer thread + if self._zmq_thread: + self.stop() + + self._shutdown_event.clear() self._zmq_thread = threading.Thread( target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview" ) @@ -49,13 +56,23 @@ class StdDaqPreview: self._shutdown_event.set() if self._zmq_thread: self._zmq_thread.join() + self._zmq_thread = None def _zmq_update_loop(self): + if self._socket is None: + self.connect() + + t_last = time.time() while not self._shutdown_event.is_set(): - if self._socket is None: - self.connect() try: - self._poll() + # pylint: disable=no-member + r = self._socket.recv_multipart(flags=zmq.NOBLOCK) + + # Throttle parsing and callbacks + t_curr = time.time() + if t_curr - t_last > self._throttle: + self._parse_data(r) + t_last = t_curr except ValueError: # Happens when ZMQ partially delivers the multipart message pass @@ -63,33 +80,10 @@ class StdDaqPreview: # Happens when receive queue is empty time.sleep(0.1) - def _poll(self): - """ - Poll the ZMQ socket for new data. It will throttle the data update and - only subscribe to the topic for a single update. This is not very nice - but it seems like there is currently no option to set the update rate on - the backend. - """ - - if self._shutdown_event.wait(0.2): - return - - try: - # subscribe to the topic - self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) - - # pylint: disable=no-member - r = self._socket.recv_multipart(flags=zmq.NOBLOCK) - self._parse_data(r) - - finally: - # Unsubscribe from the topic - self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER) - def _parse_data(self, data): # Length and throtling checks if len(data) != 2: - logger.warning(f"Received malformed array of length {len(data)}") + logger.warning(f"Received incomplete ZMQ message of length {len(data)}") # Unpack the Array V1 reply to metadata and array data meta, img_data = data @@ -98,11 +92,17 @@ class StdDaqPreview: header = json.loads(meta) if header["type"] == "uint16": image = np.frombuffer(img_data, dtype=np.uint16) + elif header["type"] == "uint8": + image = np.frombuffer(img_data, dtype=np.uint8) else: raise ValueError(f"Unexpected type {header['type']}") if image.size != np.prod(header["shape"]): err = f"Unexpected array size of {image.size} for header: {header}" raise ValueError(err) image = image.reshape(header["shape"]) - logger.info(f"Live update: frame {header['frame']}") - self._on_update_callback(image) + # Print diadnostics and run callback + logger.info( + f"Live update: frame {header['frame']}\tShape: {header['shape']}\t" + f"Mean: {np.mean(image):.3f}" + ) + self._on_update_callback(image, header)