refactor(hutch-cam): migrate hutch cam to scans v4

This commit is contained in:
2026-05-22 10:16:06 +02:00
parent 359ef0b6d7
commit 78d58ad26f
+19 -12
View File
@@ -2,14 +2,17 @@
from __future__ import annotations
import cv2
import threading
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
import cv2
from bec_lib.file_utils import get_full_path
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -21,6 +24,7 @@ CAM_USERNAME = "camera_user"
CAM_PASSWORD = "camera_user1"
CAM_PORT = 554
class HutchCam(PSIDeviceBase):
"""Class for the Hutch Cameras"""
@@ -28,7 +32,7 @@ class HutchCam(PSIDeviceBase):
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, scan_info=scan_info, **kwargs)
self.scan_parameters: ScanServerScanInfo = None
self.hostname = prefix
self.status = None
@@ -47,33 +51,36 @@ class HutchCam(PSIDeviceBase):
def on_stage(self) -> DeviceStatus:
"""Called while staging the device."""
scan_msg: ScanStatusMessage = self.scan_info.msg
file_path = get_full_path(scan_msg, name='hutch_cam_' + self.hostname).removesuffix('h5')
self.scan_parameters = fetch_scan_info(self.scan_info)
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
"h5"
)
self.status = DeviceStatus(self)
thread = threading.Thread(target=self._save_picture, args=(file_path, self.status), daemon=True)
thread = threading.Thread(
target=self._save_picture, args=(file_path, self.status), daemon=True
)
thread.start()
return self.status
def _save_picture(self, file_path, status):
try:
logger.info(f'Capture from camera {self.hostname}')
logger.info(f"Capture from camera {self.hostname}")
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error("Connection Failed", "Could not connect to the camera stream.")
return
logger.info(f'Connection to camera {self.hostname} established')
logger.info(f"Connection to camera {self.hostname} established")
ret, frame = cap.readAsync()
cap.release()
if not ret:
logger.error("Capture Failed", "Failed to capture image from camera.")
return
cv2.imwrite(file_path + 'png', frame)
cv2.imwrite(file_path + "png", frame)
status.set_finished()
logger.info(f'Capture from camera {self.hostname} done')
logger.info(f"Capture from camera {self.hostname} done")
except Exception as e:
status.set_exception(e)