create hutch camera class
This commit is contained in:
79
debye_bec/devices/cameras/hutch_cam.py
Normal file
79
debye_bec/devices/cameras/hutch_cam.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""EH Hutch Cameras"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import cv2
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices import DeviceStatus
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.messages import ScanStatusMessage
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
CAM_USERNAME = "camera_user"
|
||||
CAM_PASSWORD = "camera_user1"
|
||||
CAM_PORT = 554
|
||||
|
||||
class HutchCam(PSIDeviceBase):
|
||||
"""Class for the Hutch Cameras"""
|
||||
|
||||
# image = Cpt(Signal, name='image', kind='config')
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, scan_info=scan_info, **kwargs)
|
||||
|
||||
self.hostname = prefix
|
||||
self.status = None
|
||||
|
||||
# pylint: disable=E1101
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
|
||||
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
|
||||
if not cap.isOpened():
|
||||
logger.error(self, "Connection Failed", "Could not connect to the camera stream.")
|
||||
return
|
||||
cap.release()
|
||||
|
||||
def on_stage(self) -> DeviceStatus:
|
||||
"""Called while staging the device."""
|
||||
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
file_path = get_full_path(scan_msg, name='hutch_cam_' + self.hostname).removesuffix('h5')
|
||||
|
||||
self.status = DeviceStatus(self)
|
||||
|
||||
thread = threading.Thread(target=self._save_picture, args=(file_path, self.status), daemon=True)
|
||||
thread.start()
|
||||
|
||||
return self.status
|
||||
|
||||
def _save_picture(self, file_path, status):
|
||||
try:
|
||||
logger.info(f'Capture from camera {self.hostname}')
|
||||
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
|
||||
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
|
||||
if not cap.isOpened():
|
||||
logger.error("Connection Failed", "Could not connect to the camera stream.")
|
||||
return
|
||||
logger.info(f'Connection to camera {self.hostname} established')
|
||||
ret, frame = cap.readAsync()
|
||||
cap.release()
|
||||
if not ret:
|
||||
logger.error("Capture Failed", "Failed to capture image from camera.")
|
||||
return
|
||||
cv2.imwrite(file_path + 'png', frame)
|
||||
status.set_finished()
|
||||
logger.info(f'Capture from camera {self.hostname} done')
|
||||
except Exception as e:
|
||||
status.set_exception(e)
|
||||
Reference in New Issue
Block a user