From a8990f8de2bf7ff3643323bcc093ef7d153ea6ec Mon Sep 17 00:00:00 2001 From: gac-x06da Date: Wed, 22 Jan 2025 17:42:50 +0100 Subject: [PATCH] axis client getting ready --- pxiii_bec/devices/SmarGon.py | 212 ++++++++++++++++++----------------- 1 file changed, 112 insertions(+), 100 deletions(-) diff --git a/pxiii_bec/devices/SmarGon.py b/pxiii_bec/devices/SmarGon.py index 9888725..489e613 100644 --- a/pxiii_bec/devices/SmarGon.py +++ b/pxiii_bec/devices/SmarGon.py @@ -1,5 +1,5 @@ import time -from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal +from ophyd import Component, Device, Kind, Device, Signal, SignalRO from ophyd.status import SubscriptionStatus import requests @@ -9,121 +9,101 @@ try: logger = bec_logger.logger except ModuleNotFoundError: import logging - logger = logging.getLogger("A3200") + logger = logging.getLogger("SmarGon") class SmarGonSignal(Signal): - """Small helper class to read PVs that need to be processed first.""" + """ SmarGonSignal (R/W) + + Small helper class to read/write parameters from SmarGon. As there is no + motion status readback from smargopolo, this should be substituted with + setting with 'settle_time'. + """ - def __init__(self, prefix, *args, **kwargs): + def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs): super().__init__(*args, **kwargs) - self.prefix = prefix + self.write_addr = write_addr self.addr = self.parent.name + self._limits = (low_limit, high_limit) # self.get() - def put(self, value, *args, **kwargs): - self._go_n_put(f"target{self.prefix}?{self.addr}={value}") - return super().put(value, *args, force=True, **kwargs) + def put(self, value, *args, timestamp=None, **kwargs): + """ Overriden put to add communication with smargopolo""" + # Validate new value + self.check_value(value) + + if timestamp is None: + timestamp = time.time() + + # Perform the actual write to SmargoPolo + r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}") + + old_value = self._readback + self._timestamp = timestamp + self._readback = r[self.addr.upper()] + self.value = r[self.addr.upper()] + + # Notify subscribers + self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value, + value=value, timestamp=self._timestamp) + + @property + def limits(self): + return self._limits + + def check_value(self, value, **kwargs): + """ Check if value falls within limits""" + lol = self.limits[0] + if lol is not None: + if value < lol: + raise ValueError(f"Target {value} outside of limits {self.limits}") + hil = self.limits[1] + if hil is not None: + if value > hil: + raise ValueError(f"Target {value} outside of limits {self.limits}") def get(self, *args, **kwargs): - r = self._go_n_get(f"target{self.prefix}") + r = self.parent._go_n_get(self.write_addr) print(r) self.value = r[self.addr.upper()] return super().get(*args, **kwargs) - def _go_n_get(self, name, **kwargs): - cmd = f"{self.parent.sg_url.get()}/{name}" - r = requests.get(cmd, timeout=1) - if not r.ok: - raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}") - return r.json() - - def _go_n_put(self, name, **kwargs): - cmd = f"{self.parent.sg_url.get()}/{name}" - r = requests.put(cmd, timeout=1) - if not r.ok: - raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}") - - - - class SmarGonSignalRO(Signal): - """Small helper class to read PVs that need to be processed first. - - TODO: Add monitoring + """ Small helper class for read-only parameters PVs from SmarGon. + + TODO: Add monitoring """ - def __init__(self, prefix, *args, **kwargs): + def __init__(self, *args, read_addr="readbackSCS", **kwargs): super().__init__(*args, **kwargs) self._metadata["write_access"] = False - self.prefix = prefix + self.read_addr = read_addr self.addr = self.parent.name def get(self, *args, **kwargs): - r = self._go_n_get(f"readback{self.prefix}") + r = self.parent._go_n_get(self.read_addr) print(r) self.put(r[self.addr.upper()], force=True) return super().get(*args, **kwargs) - def _go_n_get(self, name, **kwargs): - cmd = f"{self.parent.sg_url.get()}/{name}" - r = requests.get(cmd, timeout=1) - if not r.ok: - raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}") - return r.json() +# class SmarGonMovingSignalRO(Signal): +# """Small helper class to read PVs that need to be processed first. +# TODO: Add monitoring +# """ -class SmarGonClient(Device): - """SmarGon client deice - - This class controls the SmarGon goniometer via the REST interface. - """ - # pylint: disable=too-many-instance-attributes - USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"] - - # Status attributes - sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) - - # Axis parameters - shx = Component(SmarGonSignal, group="SCS", addr="shx", kind=Kind.config) - # shy = Component(SmarGonSignal, group="SCS", addr="shy", kind=Kind.config) - # shz = Component(SmarGonSignal, group="SCS", addr="shz", kind=Kind.config) - # chi = Component(SmarGonSignal, group="SCS", addr="chi", kind=Kind.config) - # phi = Component(SmarGonSignal, group="SCS", addr="phi", kind=Kind.config) - - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - device_manager=None, - sg_url: str = "http://x06da-smargopolo.psi.ch:3000", - **kwargs, - ) -> None: - # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) - - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self.sg_url._metadata["write_access"] = False - self.sg_url.set(sg_url, force=True).wait() - - def _go_n_get(self, name, **kwargs): - cmd = f"{self.sg_url.get()}/{name}" - r = requests.get(cmd, timeout=1) - if not r.ok: - raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}") - return r.json() - - def _go_n_put(self, name, **kwargs): - cmd = f"{self.sg_url.get()}/{name}" - r = requests.put(cmd, timeout=1) - if not r.ok: - raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}") +# def __init__(self, *args, **kwargs): +# super().__init__(*args, **kwargs) +# self._metadata["write_access"] = False +# def get(self, *args, **kwargs): +# r = self.parent._go_n_get("readbackMCS") +# print(r['state']) +# moving_str = r["state"]["q1"] + r["state"]["q2"] + r["state"]["q3"] + r["state"]["q4"] + r["state"]["q5"][0] +# moving = int(moving_str) +# self.put(moving_str, force=True) +# return super().get(*args, **kwargs) class SmarGonAxis(Device): @@ -133,10 +113,15 @@ class SmarGonAxis(Device): """ # Status attributes sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) + corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config) + mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config) # Axis parameters - readback = Component(SmarGonSignalRO, kind=Kind.config) - setpoint = Component(SmarGonSignal, kind=Kind.config) + readback = Component(SmarGonSignalRO, kind=Kind.hinted) + setpoint = Component(SmarGonSignal, kind=Kind.normal) + done = Component(SignalRO, value=1, kind=Kind.normal) + # moving = Component(SmarGonMovingSignalRO, kind=Kind.config) + moving = 1 def __init__( self, @@ -149,26 +134,53 @@ class SmarGonAxis(Device): parent=None, device_manager=None, sg_url: str = "http://x06da-smargopolo.psi.ch:3000", + low_limit = None, + high_limit = None, **kwargs, ) -> None: - self.__class__.__dict__['readback'].kwargs['prefix'] = prefix - self.__class__.__dict__['readback'].kwargs['name'] = name - self.__class__.__dict__['setpoint'].kwargs['prefix'] = prefix - self.__class__.__dict__['setpoint'].kwargs['name'] = name - + self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}" + self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}" + self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit + self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) self.sg_url._metadata["write_access"] = False self.sg_url.set(sg_url, force=True).wait() + def initialize(self): + """Helper function for initial readings""" + # self.corr.get() + # self.mode.get() + r = self._go_n_get("corr_type") + print(r) + + def _go_n_get(self, address, **kwargs): + """Helper function to connect to smargopolo""" + cmd = f"{self.sg_url.get()}/{address}" + r = requests.get(cmd, timeout=1) + if not r.ok: + raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}") + return r.json() + + def _go_n_put(self, address, **kwargs): + """Helper function to connect to smargopolo""" + cmd = f"{self.sg_url.get()}/{address}" + r = requests.put(cmd, timeout=1) + if not r.ok: + raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}") + return r.json() + + + + + if __name__ == "__main__": - shz = SmarGonAxis(prefix="SCS", name="shz", sg_url="http://x06da-smargopolo.psi.ch:3000") - sg = SmarGonClient(prefix="X06DA-ES", name="smargon") - sg.wait_for_connection() - - - + shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000") + shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000") + shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000") + shx.wait_for_connection() + shy.wait_for_connection() + shz.wait_for_connection()