diff --git a/debye_bec/devices/cameras/hutch_cam.py b/debye_bec/devices/cameras/hutch_cam.py index 633b8dc..64e68f3 100644 --- a/debye_bec/devices/cameras/hutch_cam.py +++ b/debye_bec/devices/cameras/hutch_cam.py @@ -2,14 +2,17 @@ from __future__ import annotations -import cv2 import threading from typing import TYPE_CHECKING -from bec_lib.logger import bec_logger +import cv2 from bec_lib.file_utils import get_full_path -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo from ophyd_devices import DeviceStatus +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from debye_bec.devices.utils.utils import fetch_scan_info if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo @@ -21,6 +24,7 @@ CAM_USERNAME = "camera_user" CAM_PASSWORD = "camera_user1" CAM_PORT = 554 + class HutchCam(PSIDeviceBase): """Class for the Hutch Cameras""" @@ -28,7 +32,7 @@ class HutchCam(PSIDeviceBase): def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): super().__init__(name=name, scan_info=scan_info, **kwargs) - + self.scan_parameters: ScanServerScanInfo = None self.hostname = prefix self.status = None @@ -47,33 +51,36 @@ class HutchCam(PSIDeviceBase): def on_stage(self) -> DeviceStatus: """Called while staging the device.""" - - scan_msg: ScanStatusMessage = self.scan_info.msg - file_path = get_full_path(scan_msg, name='hutch_cam_' + self.hostname).removesuffix('h5') + self.scan_parameters = fetch_scan_info(self.scan_info) + file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix( + "h5" + ) self.status = DeviceStatus(self) - thread = threading.Thread(target=self._save_picture, args=(file_path, self.status), daemon=True) + thread = threading.Thread( + target=self._save_picture, args=(file_path, self.status), daemon=True + ) thread.start() return self.status def _save_picture(self, file_path, status): try: - logger.info(f'Capture from camera {self.hostname}') + logger.info(f"Capture from camera {self.hostname}") rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1" cap = cv2.VideoCapture(f"{rtsp_url}?tcp") if not cap.isOpened(): logger.error("Connection Failed", "Could not connect to the camera stream.") return - logger.info(f'Connection to camera {self.hostname} established') + logger.info(f"Connection to camera {self.hostname} established") ret, frame = cap.readAsync() cap.release() if not ret: logger.error("Capture Failed", "Failed to capture image from camera.") return - cv2.imwrite(file_path + 'png', frame) + cv2.imwrite(file_path + "png", frame) status.set_finished() - logger.info(f'Capture from camera {self.hostname} done') + logger.info(f"Capture from camera {self.hostname} done") except Exception as e: status.set_exception(e)