diff --git a/pyproject.toml b/pyproject.toml index cc9254e..731a0d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ classifiers = [ "Programming Language :: Python :: 3", "Topic :: Scientific/Engineering", ] -dependencies = ["ophyd_devices", "bec_lib", "websockets", "pyzmq"] +dependencies = ["ophyd_devices", "bec_lib", "requests", "websockets", "pyzmq", "jinja2"] [project.optional-dependencies] dev = ["black", "isort", "coverage", "pylint", "pytest", "pytest-random-order", "ophyd_devices", "bec_server"] diff --git a/tomcat_bec/devices/gigafrost/gigafrostclient.py b/tomcat_bec/devices/gigafrost/gigafrostclient.py index a10a011..33bc86f 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostclient.py +++ b/tomcat_bec/devices/gigafrost/gigafrostclient.py @@ -108,7 +108,7 @@ class GigaFrostClient(PSIDetectorBase): custom_prepare_cls = GigaFrostClientMixin USER_ACCESS = ["kickoff"] - cam = Component(gfcam.GigaFrostCamera, prefix="X02DA-CAM-GF2:", name="cam") + # cam = Component(gfcam.GigaFrostCamera, prefix="X02DA-CAM-GF2:", name="cam") daq = Component(stddaq.StdDaqClient, name="daq") # pylint: disable=too-many-arguments @@ -124,8 +124,8 @@ class GigaFrostClient(PSIDetectorBase): kind=None, **kwargs, ): - self.__class__.__dict__["cam"].kwargs['backend_url'] = backend_url - self.__class__.__dict__["cam"].kwargs['auto_soft_enable'] = auto_soft_enable + # self.__class__.__dict__["cam"].kwargs['backend_url'] = backend_url + # self.__class__.__dict__["cam"].kwargs['auto_soft_enable'] = auto_soft_enable self.__class__.__dict__["daq"].kwargs['ws_url'] = daq_ws_url self.__class__.__dict__["daq"].kwargs['rest_url'] = daq_rest_url diff --git a/tomcat_bec/devices/gigafrost/stddaq_client.py b/tomcat_bec/devices/gigafrost/stddaq_client.py index 437acbd..3ce7bd4 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_client.py +++ b/tomcat_bec/devices/gigafrost/stddaq_client.py @@ -9,181 +9,70 @@ Created on Thu Jun 27 17:28:43 2024 import json from time import sleep from threading import Thread +import requests + from ophyd import Device, Signal, Component, Kind, DeviceStatus, Staged from websockets.sync.client import connect from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError -try: - from stddaq_rest import StdDaqRestClient -except ModuleNotFoundError: - from tomcat_bec.devices.gigafrost.stddaq_rest import StdDaqRestClient + +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase +from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin +from bec_lib import bec_logger +logger = bec_logger.logger -class StdDaqClient(Device): - """StdDaq API +class StdDaqMixin(CustomDeviceMixin): + # parent : StdDaqClient + _mon = None - This class combines the new websocket and REST interfaces of the stdDAQ that - were meant to replace the documented python client. The websocket interface - starts and stops the acquisition and provides status, while the REST - interface can read and write the configuration. The DAQ needs to restart - all services to reconfigure with a new config. + def on_stage(self) -> None: + logger.warning(self.parent.scaninfo.__dict__) - The websocket provides status updates about a running acquisition but the - interface breaks connection at the end of the run. + # Restart the DAQ if resolution changed - The standard DAQ configuration is a single JSON file locally autodeployed - to the DAQ servers (as root!!!). It can only be written through a REST API - that is semi-supported. The DAQ might be distributed across several servers, - we'll only interface with the primary REST interface will synchronize with - all secondary REST servers. In the past this was a source of problems. + reconfigure = False + if hasattr(self.parent.scaninfo, 'image_width'): + reconfigure = True + self.parent.cfg_pixel_width.set(self.parent.scaninfo.image_width).wait() + if hasattr(self.parent.scaninfo, 'image_height'): + reconfigure = True + self.parent.cfg_pixel_height.set(self.parent.scaninfo.image_height).wait() - Example: - ''' - daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000") - ''' - """ - # pylint: disable=too-many-instance-attributes - USER_ACCESS=["kickoff"] + # Restart the DAQ if resolution changed + cfg = self.parent.get_daq_config() + if cfg['image_pixel_height'] != self.parent.cfg_pixel_height.get() or cfg['image_pixel_width'] != self.parent.cfg_pixel_width.get(): + self.parent.safestop() + sleep(1) + cfg = self.parent.get_daq_config() + changes = { + 'image_pixel_height': int(self.parent.cfg_pixel_height.get()), + 'image_pixel_width': int(self.parent.cfg_pixel_width.get()), + } + cfg = cfg.update(changes) + self.parent.set_daq_config(cfg) + self.parent.read_daq_config() - # Status attributes - url = Component(Signal, kind=Kind.config) - status = Component(Signal, value="unknown", kind=Kind.normal) - n_total = Component(Signal, value=10000, kind=Kind.config) - file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) - # Configuration - config = Component(StdDaqRestClient, kind=Kind.config) + # Online configurable changes + if hasattr(self.parent.scaninfo, 'num_images'): + self.parent.num_images.set(self.parent.scaninfo.num_images).wait() + if hasattr(self.parent.scaninfo, 'daq_file_path'): + self.parent.file_path.set(self.parent.scaninfo.daq_file_path).wait() + file_path = self.parent.file_path.get() + num_images = self.parent.num_images.get() - def __init__( - self, - *args, - ws_url: str = "ws://localhost:8080", - rest_url: str = "http://localhost:5000", - parent: Device = None, - **kwargs - ) -> None: - self.__class__.__dict__['config'].kwargs['rest_url'] = rest_url - - super().__init__(*args, parent=parent, **kwargs) - self.status._metadata["write_access"] = False - self.url._metadata["write_access"] = False - self.url.set(ws_url, force=True).wait() - self._ws_url = ws_url - self._mon = None - - # Connect ro the DAQ - self._client = None - self.connect() - - def __del__(self) -> None: - self._client.close() - return super().__del__() - - def connect(self): - """Connect to the StdDAQ's websockets interface - - StdDAQ may reject connection for a few seconds after restart, or when - it wants so if it fails, wait a bit and try to connect again. - """ - num_retry = 0 - while num_retry < 5: - try: - self._client = connect(self._ws_url) - break - except ConnectionRefusedError: - num_retry += 1 - sleep(3) - if num_retry == 5: - raise ConnectionRefusedError( - "The stdDAQ websocket interface refused connection 5 times.") - - def monitor(self): - """Attach monitoring to the DAQ""" - self._client = connect(self._ws_url) - self._mon = Thread(target=self.poll, daemon=True) - self._mon.start() - - def configure(self, d: dict = None) -> tuple: - """Set the standard DAQ parameters for the next run - - Note that full reconfiguration is not possible with the websocket - interface, only changing acquisition parameters. These changes are only - activated upon staging! - - Example: - ---------- - std.configure(n_total=10000, file_path="/data/test/raw") - - Parameters - ---------- - n_total : int, optional - Total number of images to be taken during each scan. Set to -1 for - an unlimited number of images (limited by the ringbuffer size and - backend speed). (default = 10000) - file_path : string, optional - Save file path. (default = '/gpfs/test/test-beamline') - """ - old_config = self.read_configuration() - - if d is not None: - # Set acquisition parameters - if 'n_total' in d: - self.n_total.set(int(d['n_total'])) - if 'ntotal' in d: - self.n_total.set(int(d['ntotal'])) - if 'file_path' in d: - self.output_file.set(str(d['file_path'])) - # Configure DAQ - if 'pixel_width' in d or 'pixel_height' in d: - # Safe stop before configure (see 'reset') - self.reset() - self.config.configure(d) - - new_config = self.read_configuration() - return (old_config, new_config) - - - def reset(self): - """ - - The current stdDAQ refuses connection if another session is running. This is safety so - we don't accidentally kill a running exposure. But this also means that we have to wait - until a dead session dies of timeout. - - NOTE: REST reconfiguration restarts with systemd and can corrupt currently written files. - """ - try: - if self._client is not None: - self._client.close() - self._client = connect(self._ws_url) - msg = json.dumps({"command": "stop"}) - self._client.send(msg) - except (ConnectionClosedError, ConnectionClosedOK, ConnectionRefusedError): - pass - self._staged = Staged.no - sleep(1) - - def stage(self) -> list: - """Start a new run with the standard DAQ - - Behavior: the StdDAQ can stop the previous run either by itself or - by calling unstage. So it might start from an already running state or - not, we can't query if not running. - """ - if self._staged: - self._client.close() - - file_path = self.file_path.get() - n_total = self.n_total.get() - - message = {"command": "start", "path": file_path, "n_image": n_total} + # Try to start a new run + message = {"command": "start", "path": file_path, "n_image": num_images} ii = 0 + self.parent.connect() while True: - reply = self.message(message) + reply = self.parent.message(message) if reply is not None: reply = json.loads(reply) - self.status.put(reply["status"], force=True) + self.parent.status.set(reply["status"], force=True).wait() + logger.info(f"[{self.parent.name}] Start DAq reply: {reply['status']}") # Give it more time to reconfigure if reply["status"] in ("rejected"): sleep(2) @@ -196,76 +85,44 @@ class StdDaqClient(Device): raise RuntimeError( f"Start StdDAQ command rejected (might be already running): {reply['reason']}" ) - + # And start status monitoring self._mon = Thread(target=self.poll, daemon=True) self._mon.start() - return super().stage() - def unstage(self): - """ Stop a running acquisition - - WARN: This will also close the connection!!! + def on_unstage(self) -> None: + """ Stop a running acquisition and close connection """ # The poller thread locks recv raising a RuntimeError try: + if self.parent._wsclient is None: + self.parent.connect() message = {"command": "stop"} - self.message(message, wait_reply=False) + self.parent.message(message, wait_reply=False) except RuntimeError: pass - self._client.close() - return super().unstage() - - def kickoff(self) -> DeviceStatus: - """ The DAQ was not meant to quickly toggle""" - return DeviceStatus(self, done=True, success=True, settle_time=0.1) - - def stop(self, *, success=False): - """ Stop a running acquisition - - WARN: This will also close the connection!!! - """ - self.unstage() - - def message(self, message: dict, timeout=1, wait_reply=True): - """Send a message to the StdDAQ and receive a reply - - Note: finishing acquisition means StdDAQ will close connection, so - there's no idle state polling. - """ - if isinstance(message, dict): - msg = json.dumps(message) - else: - msg = str(message) - - # Send message (reopen connection if needed) - try: - self._client.send(msg) - except (ConnectionClosedError, ConnectionClosedOK): - self.connect() - self._client.send(msg) - - # Wait for reply - reply = None - if wait_reply: + finally: try: - reply = self._client.recv(timeout) - return reply - except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex: - print(ex) - return reply + self.parent._wsclient.close() + except TypeError: + # Already closed + pass - def poll(self): - """Monitor status messages until connection is open + def on_stop(self): + """ Stop a running acquisition and close connection + """ + return self.parent.unstage() - This will block the reply monitoring to calling unstage() might throw. - Status updates are sent every 1 seconds + def poll(self) -> None: + """ Monitor status messages while connection is open. This will block the reply monitoring + to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing + acquisition means StdDAQ will close connection, so there's no idle state polling. """ try: - sleep(1.2) - for msg in self._client: + sleep(0.2) + for msg in self.parent._wsclient: try: message = json.loads(msg) - self.status.put(message["status"], force=True) + self.parent.status.put(message["status"], force=True) except (ConnectionClosedError, ConnectionClosedOK): return except Exception as ex: @@ -280,6 +137,173 @@ class StdDaqClient(Device): self._mon = None +class StdDaqClient(PSIDeviceBase): + """StdDaq API + + This class combines the new websocket and REST interfaces of the stdDAQ replaced the documented + python client. The websocket interface starts and stops the acquisition and provides status, + while the REST interface can read and write the JSON configuration file. + + The DAQ needs to restart all services to reconfigure with a new config, which might corrupt + the currently written files (fix is underway). + + Example: + ``` + daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000") + ``` + """ + # pylint: disable=too-many-instance-attributes + custom_prepare_cls = StdDaqMixin + USER_ACCESS = ["set_daq_config", "get_daq_config", "safestop", "restart"] + _wsclient = None + + # Status attributes + ws_url = Component(Signal, kind=Kind.config) + status = Component(Signal, value="unknown", kind=Kind.normal) + num_images = Component(Signal, value=10000, kind=Kind.config) + file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) + # Configuration attributes + rest_url = Component(Signal, kind=Kind.config) + cfg_detector_name = Component(Signal, kind=Kind.config) + cfg_detector_type = Component(Signal, kind=Kind.config) + cfg_bit_depth = Component(Signal, kind=Kind.config) + cfg_pixel_height = Component(Signal, kind=Kind.config) + cfg_pixel_width = Component(Signal, kind=Kind.config) + + def __init__( + self, + *args, + ws_url: str = "ws://localhost:8080", + rest_url: str = "http://localhost:5000", + parent: Device = None, + **kwargs + ) -> None: + super().__init__(*args, parent=parent, **kwargs) + self.status._metadata["write_access"] = False + self.ws_url._metadata["write_access"] = False + self.ws_url.set(ws_url, force=True).wait() + self.rest_url._metadata["write_access"] = False + self.rest_url.set(rest_url, force=True).wait() + self._mon = None + + # Connect ro the DAQ and initialize values + try: + self.read_daq_config() + except Exception as ex: + logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") + + def __del__(self) -> None: + self._wsclient.close() + return super().__del__() + + def connect(self): + """Connect to the StdDAQ's websockets interface + + StdDAQ may reject connection for a few seconds after restart, or when + it wants so if it fails, wait a bit and try to connect again. + """ + num_retry = 0 + while num_retry < 5: + try: + self._wsclient = connect(self.ws_url.get()) + break + except ConnectionRefusedError: + num_retry += 1 + sleep(3) + if num_retry == 5: + raise ConnectionRefusedError( + "The stdDAQ websocket interface refused connection 5 times.") + + def message(self, message: dict, timeout=1, wait_reply=True): + """Send a message to the StdDAQ and receive a reply + + Note: finishing acquisition means StdDAQ will close connection, so + there's no idle state polling. + """ + # Send message (reopen connection if needed) + logger.warning(self._wsclient.__dict__) + try: + msg = json.dumps(message) if isinstance(message, dict) else str(message) + self._wsclient.send(msg) + except (ConnectionClosedError, ConnectionClosedOK) as ex: + print(ex) + # self.connect() + self._wsclient.send(msg) + + # Wait for reply + reply = None + if wait_reply: + try: + reply = self._wsclient.recv(timeout) + return reply + except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex: + print(ex) + return reply + + def get_daq_config(self) -> dict: + """Read the current configuration from the DAQ + """ + r = requests.get( + self.rest_url.get() + '/api/config/get', + params={'user': "ioc"}, + timeout=2) + if r.status_code != 200: + raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") + return r.json() + + def read_daq_config(self) -> dict: + """Read the current configuration from the DAQ and update the ophyd device + """ + cfg = self.get_daq_config() + self.cfg_detector_name.set(cfg['detector_name']).wait() + self.cfg_detector_type.set(cfg['detector_type']).wait() + self.cfg_bit_depth.set(cfg['bit_depth']).wait() + self.cfg_pixel_height.set(cfg['image_pixel_height']).wait() + self.cfg_pixel_width.set(cfg['image_pixel_width']).wait() + return cfg + + def set_daq_config(self, config): + """Write a full configuration to the DAQ + """ + r = requests.post( + self.rest_url.get() + '/api/config/set', + params={"user": "ioc"}, + json=config, + timeout=2, + headers={"Content-Type": "application/json"} + ) + if r.status_code != 200: + raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") + return r.json() + + def restart(self): + """ Reconfigures the stdDAQ to restart the services + """ + cfg = self.get_daq_config() + self.set_daq_config(cfg) + + + def safestop(self): + """ + + The current stdDAQ refuses connection if another session is running. This is safety so + we don't accidentally kill a running exposure. But this also means that we have to wait + until a dead session dies of timeout. + + NOTE: REST reconfiguration restarts with systemd and can corrupt currently written files. + """ + try: + if self._wsclient is not None: + self._wsclient.close() + self._wsclient = connect(self._ws_url) + msg = json.dumps({"command": "stop"}) + self._wsclient.send(msg) + except (ConnectionClosedError, ConnectionClosedOK, ConnectionRefusedError): + pass + self._staged = Staged.no + sleep(1) + + # Automatically connect to microXAS testbench if directly invoked if __name__ == "__main__": daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000") diff --git a/tomcat_bec/devices/gigafrost/stddaq_rest.py b/tomcat_bec/devices/gigafrost/stddaq_rest.py index d8abfb4..47f3ccb 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_rest.py +++ b/tomcat_bec/devices/gigafrost/stddaq_rest.py @@ -7,15 +7,18 @@ Created on Thu Jun 27 17:28:43 2024 @author: mohacsi_i """ from time import sleep -from ophyd import Device, Signal, Component, Kind import requests +from ophyd import Device, Signal, Component, Kind +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase +from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin +from bec_lib import bec_logger +logger = bec_logger.logger + + + + + -try: - from bec_lib import bec_logger - logger = bec_logger.logger -except ModuleNotFoundError: - import logging - logger = logging.getLogger("GfCam") class StdDaqRestClient(Device): @@ -57,9 +60,10 @@ class StdDaqRestClient(Device): cfg_module_positions = Component(Signal, kind=Kind.config) def __init__( - self, *args, rest_url: str = "http://localhost:5000", parent: Device = None, **kwargs + self, *args, rest_url: str = "http://localhost:5000", camera_name="gigafrost", parent: Device = None, **kwargs ) -> None: super().__init__(*args, parent=parent, **kwargs) + self.camera_name = camera_name self.rest_url._metadata["write_access"] = False self.rest_url.put(rest_url, force=True)