axis client getting ready

This commit is contained in:
gac-x06da
2025-01-22 17:42:50 +01:00
parent 8da2ed4102
commit a8990f8de2

View File

@@ -1,5 +1,5 @@
import time
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal
from ophyd import Component, Device, Kind, Device, Signal, SignalRO
from ophyd.status import SubscriptionStatus
import requests
@@ -9,121 +9,101 @@ try:
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("A3200")
logger = logging.getLogger("SmarGon")
class SmarGonSignal(Signal):
"""Small helper class to read PVs that need to be processed first."""
""" SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, prefix, *args, **kwargs):
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.prefix = prefix
self.write_addr = write_addr
self.addr = self.parent.name
self._limits = (low_limit, high_limit)
# self.get()
def put(self, value, *args, **kwargs):
self._go_n_put(f"target{self.prefix}?{self.addr}={value}")
return super().put(value, *args, force=True, **kwargs)
def put(self, value, *args, timestamp=None, **kwargs):
""" Overriden put to add communication with smargopolo"""
# Validate new value
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.addr.upper()]
self.value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value,
value=value, timestamp=self._timestamp)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
""" Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def get(self, *args, **kwargs):
r = self._go_n_get(f"target{self.prefix}")
r = self.parent._go_n_get(self.write_addr)
print(r)
self.value = r[self.addr.upper()]
return super().get(*args, **kwargs)
def _go_n_get(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
class SmarGonSignalRO(Signal):
"""Small helper class to read PVs that need to be processed first.
TODO: Add monitoring
""" Small helper class for read-only parameters PVs from SmarGon.
TODO: Add monitoring
"""
def __init__(self, prefix, *args, **kwargs):
def __init__(self, *args, read_addr="readbackSCS", **kwargs):
super().__init__(*args, **kwargs)
self._metadata["write_access"] = False
self.prefix = prefix
self.read_addr = read_addr
self.addr = self.parent.name
def get(self, *args, **kwargs):
r = self._go_n_get(f"readback{self.prefix}")
r = self.parent._go_n_get(self.read_addr)
print(r)
self.put(r[self.addr.upper()], force=True)
return super().get(*args, **kwargs)
def _go_n_get(self, name, **kwargs):
cmd = f"{self.parent.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
# class SmarGonMovingSignalRO(Signal):
# """Small helper class to read PVs that need to be processed first.
# TODO: Add monitoring
# """
class SmarGonClient(Device):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface.
"""
# pylint: disable=too-many-instance-attributes
USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
# Axis parameters
shx = Component(SmarGonSignal, group="SCS", addr="shx", kind=Kind.config)
# shy = Component(SmarGonSignal, group="SCS", addr="shy", kind=Kind.config)
# shz = Component(SmarGonSignal, group="SCS", addr="shz", kind=Kind.config)
# chi = Component(SmarGonSignal, group="SCS", addr="chi", kind=Kind.config)
# phi = Component(SmarGonSignal, group="SCS", addr="phi", kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
**kwargs,
) -> None:
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
def _go_n_get(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
# self._metadata["write_access"] = False
# def get(self, *args, **kwargs):
# r = self.parent._go_n_get("readbackMCS")
# print(r['state'])
# moving_str = r["state"]["q1"] + r["state"]["q2"] + r["state"]["q3"] + r["state"]["q4"] + r["state"]["q5"][0]
# moving = int(moving_str)
# self.put(moving_str, force=True)
# return super().get(*args, **kwargs)
class SmarGonAxis(Device):
@@ -133,10 +113,15 @@ class SmarGonAxis(Device):
"""
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
# Axis parameters
readback = Component(SmarGonSignalRO, kind=Kind.config)
setpoint = Component(SmarGonSignal, kind=Kind.config)
readback = Component(SmarGonSignalRO, kind=Kind.hinted)
setpoint = Component(SmarGonSignal, kind=Kind.normal)
done = Component(SignalRO, value=1, kind=Kind.normal)
# moving = Component(SmarGonMovingSignalRO, kind=Kind.config)
moving = 1
def __init__(
self,
@@ -149,26 +134,53 @@ class SmarGonAxis(Device):
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit = None,
high_limit = None,
**kwargs,
) -> None:
self.__class__.__dict__['readback'].kwargs['prefix'] = prefix
self.__class__.__dict__['readback'].kwargs['name'] = name
self.__class__.__dict__['setpoint'].kwargs['prefix'] = prefix
self.__class__.__dict__['setpoint'].kwargs['name'] = name
self.__class__.__dict__['readback'].kwargs['read_addr'] = f"readback{prefix}"
self.__class__.__dict__['setpoint'].kwargs['write_addr'] = f"target{prefix}"
self.__class__.__dict__['setpoint'].kwargs['low_limit'] = low_limit
self.__class__.__dict__['setpoint'].kwargs['high_limit'] = high_limit
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
def initialize(self):
"""Helper function for initial readings"""
# self.corr.get()
# self.mode.get()
r = self._go_n_get("corr_type")
print(r)
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}")
return r.json()
if __name__ == "__main__":
shz = SmarGonAxis(prefix="SCS", name="shz", sg_url="http://x06da-smargopolo.psi.ch:3000")
sg = SmarGonClient(prefix="X06DA-ES", name="smargon")
sg.wait_for_connection()
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(prefix="SCS", name="shz", low_limit=10, high_limit=22, sg_url="http://x06da-smargopolo.psi.ch:3000")
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()