axis client getting ready

This commit is contained in:
gac-x06da
2025-01-22 17:42:50 +01:00
parent 8da2ed4102
commit a8990f8de2
+112 -100
View File
@@ -1,5 +1,5 @@
import time import time
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal from ophyd import Component, Device, Kind, Device, Signal, SignalRO
from ophyd.status import SubscriptionStatus from ophyd.status import SubscriptionStatus
import requests import requests
@@ -9,121 +9,101 @@ try:
logger = bec_logger.logger logger = bec_logger.logger
except ModuleNotFoundError: except ModuleNotFoundError:
import logging import logging
logger = logging.getLogger("A3200") logger = logging.getLogger("SmarGon")
class SmarGonSignal(Signal): class SmarGonSignal(Signal):
"""Small helper class to read PVs that need to be processed first.""" """ SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, prefix, *args, **kwargs): def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.prefix = prefix self.write_addr = write_addr
self.addr = self.parent.name self.addr = self.parent.name
self._limits = (low_limit, high_limit)
# self.get() # self.get()
def put(self, value, *args, **kwargs): def put(self, value, *args, timestamp=None, **kwargs):
self._go_n_put(f"target{self.prefix}?{self.addr}={value}") """ Overriden put to add communication with smargopolo"""
return super().put(value, *args, force=True, **kwargs) # Validate new value
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.addr.upper()]
self.value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value,
value=value, timestamp=self._timestamp)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
""" Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
r = self._go_n_get(f"target{self.prefix}") r = self.parent._go_n_get(self.write_addr)
print(r) print(r)
self.value = r[self.addr.upper()] self.value = r[self.addr.upper()]
return super().get(*args, **kwargs) return super().get(*args, **kwargs)
def _go_n_get(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
class SmarGonSignalRO(Signal): class SmarGonSignalRO(Signal):
"""Small helper class to read PVs that need to be processed first. """ Small helper class for read-only parameters PVs from SmarGon.
TODO: Add monitoring TODO: Add monitoring
""" """
def __init__(self, prefix, *args, **kwargs): def __init__(self, *args, read_addr="readbackSCS", **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._metadata["write_access"] = False self._metadata["write_access"] = False
self.prefix = prefix self.read_addr = read_addr
self.addr = self.parent.name self.addr = self.parent.name
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
r = self._go_n_get(f"readback{self.prefix}") r = self.parent._go_n_get(self.read_addr)
print(r) print(r)
self.put(r[self.addr.upper()], force=True) self.put(r[self.addr.upper()], force=True)
return super().get(*args, **kwargs) return super().get(*args, **kwargs)
def _go_n_get(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
# class SmarGonMovingSignalRO(Signal):
# """Small helper class to read PVs that need to be processed first.
# TODO: Add monitoring
# """
class SmarGonClient(Device): # def __init__(self, *args, **kwargs):
"""SmarGon client deice # super().__init__(*args, **kwargs)
# self._metadata["write_access"] = False
This class controls the SmarGon goniometer via the REST interface.
"""
# pylint: disable=too-many-instance-attributes
USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
# Axis parameters
shx = Component(SmarGonSignal, group="SCS", addr="shx", kind=Kind.config)
# shy = Component(SmarGonSignal, group="SCS", addr="shy", kind=Kind.config)
# shz = Component(SmarGonSignal, group="SCS", addr="shz", kind=Kind.config)
# chi = Component(SmarGonSignal, group="SCS", addr="chi", kind=Kind.config)
# phi = Component(SmarGonSignal, group="SCS", addr="phi", kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
**kwargs,
) -> None:
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
def _go_n_get(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
# def get(self, *args, **kwargs):
# r = self.parent._go_n_get("readbackMCS")
# print(r['state'])
# moving_str = r["state"]["q1"] + r["state"]["q2"] + r["state"]["q3"] + r["state"]["q4"] + r["state"]["q5"][0]
# moving = int(moving_str)
# self.put(moving_str, force=True)
# return super().get(*args, **kwargs)
class SmarGonAxis(Device): class SmarGonAxis(Device):
@@ -133,10 +113,15 @@ class SmarGonAxis(Device):
""" """
# Status attributes # Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
# Axis parameters # Axis parameters
readback = Component(SmarGonSignalRO, kind=Kind.config) readback = Component(SmarGonSignalRO, kind=Kind.hinted)
setpoint = Component(SmarGonSignal, kind=Kind.config) setpoint = Component(SmarGonSignal, kind=Kind.normal)
done = Component(SignalRO, value=1, kind=Kind.normal)
# moving = Component(SmarGonMovingSignalRO, kind=Kind.config)
moving = 1
def __init__( def __init__(
self, self,
@@ -149,26 +134,53 @@ class SmarGonAxis(Device):
parent=None, parent=None,
device_manager=None, device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000", sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit = None,
high_limit = None,
**kwargs, **kwargs,
) -> None: ) -> None:
self.__class__.__dict__['readback'].kwargs['prefix'] = prefix self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}"
self.__class__.__dict__['readback'].kwargs['name'] = name self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}"
self.__class__.__dict__['setpoint'].kwargs['prefix'] = prefix self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit
self.__class__.__dict__['setpoint'].kwargs['name'] = name self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.sg_url._metadata["write_access"] = False self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait() self.sg_url.set(sg_url, force=True).wait()
def initialize(self):
"""Helper function for initial readings"""
# self.corr.get()
# self.mode.get()
r = self._go_n_get("corr_type")
print(r)
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}")
return r.json()
if __name__ == "__main__": if __name__ == "__main__":
shz = SmarGonAxis(prefix="SCS", name="shz", sg_url="http://x06da-smargopolo.psi.ch:3000") shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
sg = SmarGonClient(prefix="X06DA-ES", name="smargon") shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
sg.wait_for_connection() shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000")
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()