Fixed stage delay. Added camera name/info. Added image from image endpoint.
CI for debye_bec / test (push) Failing after 56s
CI for debye_bec / test (pull_request) Failing after 35s

This commit is contained in:
x01da
2026-06-22 11:40:56 +02:00
parent 3946f43a20
commit 5f3874f3fc
2 changed files with 47 additions and 28 deletions
@@ -70,10 +70,9 @@ xas_config:
#xrd_config:
# - !include ./x01da_xrd.yaml
# Commented out because too slow
## Hutch cameras
# hutch_cams:
# - !include ./x01da_hutch_cameras.yaml
hutch_cams:
- !include ./x01da_hutch_cameras.yaml
## Remaining experimental hutch
es_config:
+45 -25
View File
@@ -6,9 +6,13 @@ import threading
from typing import TYPE_CHECKING
import cv2
import numpy as np
import requests
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -16,7 +20,6 @@ from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import ScanStatusMessage
logger = bec_logger.logger
@@ -28,59 +31,76 @@ CAM_PORT = 554
class HutchCam(PSIDeviceBase):
"""Class for the Hutch Cameras"""
# image = Cpt(Signal, name='image', kind='config')
USER_ACCESS = ["acquire_from_image"]
image = Cpt(Signal, name="image", kind=Kind.config)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, scan_info=scan_info, **kwargs)
self.scan_parameters: ScanServerScanInfo = None
self.hostname = prefix
self.status = None
self.name = ""
# pylint: disable=E1101
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Retrieve camera name which also makes sure the camera is connected.
"""
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error(self, "Connection Failed", "Could not connect to the camera stream.")
return
cap.release()
info_url = f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/info.cgi?item=c.1.name"
response = requests.get(info_url, timeout=5)
response.raise_for_status()
self.name = response.content.decode("utf-8").split("c.1.name.utf8:=")[-1].strip().lower()
def on_stage(self) -> DeviceStatus:
"""Called while staging the device."""
self.scan_parameters = fetch_scan_info(self.scan_info)
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
"h5"
)
self.status = DeviceStatus(self)
file_path = get_full_path(self.scan_info.msg, name=self.name).removesuffix("h5")
thread = threading.Thread(
target=self._save_picture, args=(file_path, self.status), daemon=True
target=self.acquire_and_save_from_video, args=(file_path,), daemon=True
)
thread.start()
return self.status
return None
def _save_picture(self, file_path, status):
def acquire_from_image(self):
"""
Acquire an image from the image endpoint of the camera. Resolution is
1280 x 720 px and cannot be changed. Acquisition is fast.
"""
logger.debug(f"Capture from camera {self.hostname}")
image_url = (
f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/image.cgi"
)
response = requests.get(image_url, timeout=5)
response.raise_for_status()
img = cv2.imdecode(
np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR
) # Or IMREAD_GRAYSCALE
self.image.put(img)
def acquire_and_save_from_video(self, file_path):
"""
Acquire an image from a videostream. Videostream resolution can be changed in the camera settings.
Acquisition is slow, as opening the video stream can take a few seconds.
Args:
file_path (str): File path including filename where image will be saved.
"""
try:
logger.info(f"Capture from camera {self.hostname}")
logger.debug(f"Capture from camera {self.hostname}")
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error("Connection Failed", "Could not connect to the camera stream.")
return
logger.info(f"Connection to camera {self.hostname} established")
ret, frame = cap.readAsync()
logger.debug(f"Connection to camera {self.hostname} established")
ret, frame = cap.read()
cap.release()
if not ret:
logger.error("Capture Failed", "Failed to capture image from camera.")
return
cv2.imwrite(file_path + "png", frame)
status.set_finished()
logger.info(f"Capture from camera {self.hostname} done")
logger.debug(f"Capture from camera {self.hostname} done")
except Exception as e:
status.set_exception(e)
logger.error(f"Could not acquire image from video stream: {e}")