First tries with ABR

This commit is contained in:
gac-x06da
2025-01-15 14:46:51 +01:00
parent 0b99a82ae9
commit d3d016108e
2 changed files with 485 additions and 0 deletions

View File

@@ -0,0 +1,94 @@
import time
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal
from ophyd.status import SubscriptionStatus
import requests
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("A3200")
class SmarGonSignal(Signal):
"""Small helper class to read PVs that need to be processed first."""
def __init__(self, group, addr, *args, **kwargs):
super().__init__(*args, **kwargs)
self.group = group
self.addr = addr
# self.get()
def put(self, value, *args, **kwargs):
r = self.parent._go_n_put(f"target{self.group}?{self.addr}={value}")
print(r)
return super().put(*args, **kwargs)
def get(self, *args, **kwargs):
r = self.parent._go_n_get(f"target{self.group}?{self.addr}")
print(r)
return super().get(*args, **kwargs)
class SmarGonClient(Device):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface.
"""
# pylint: disable=too-many-instance-attributes
USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
# Axis parameters
shx = Component(SmarGonSignal, group="SCS", name="shx", kind=Kind.config)
# shy = Component(SmarGonSignal, group="SCS", name="shy", kind=Kind.config)
# shz = Component(SmarGonSignal, group="SCS", name="shz", kind=Kind.config)
# chi = Component(SmarGonSignal, group="SCS", name="chi", kind=Kind.config)
# phi = Component(SmarGonSignal, group="SCS", name="phi", kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
**kwargs,
) -> None:
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self.sg_url._metadata["write_access"] = False
self.sg_url.set(sg_url, force=True).wait()
def _go_n_get(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.get(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
return r.json()
def _go_n_put(self, name, **kwargs):
cmd = f"{self.sg_url.get()}/{name}"
r = requests.put(cmd, timeout=1)
if not r.ok:
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
if __name__ == "__main__":
sg = SmarGonClient(prefix="X06DA-ES", name="smargon")
sg.wait_for_connection()

View File

@@ -0,0 +1,391 @@
#!/usr/bin/env python3
from time import sleep, time
from typing import Tuple
from requests import get, put
from beamline import beamline
from mx_redis import SMARGON
try:
from mx_preferences import get_config
host = get_config(beamline)["smargon"]["host"]
port = get_config(beamline)["smargon"]["port"]
except Exception:
host = "x06da-smargopolo.psi.ch"
port = 3000
base = f"http://{host}:{port}"
def gonget(thing: str, **kwargs) -> dict:
"""issue a GET for some API component on the smargopolo server"""
cmd = f"{base}/{thing}"
if kwargs.get("verbose", False):
print(cmd)
r = get(cmd)
if not r.ok:
raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}")
return r.json()
def gonput(thing: str, **kwargs):
"""issue a PUT for some API component on the smargopolo server"""
cmd = f"{base}/{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def scsput(**kwargs):
"""
Issue a new absolute target in the SH coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def bcsput(**kwargs):
"""
Issue a new absolute target in the beamline coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi")
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetBCS?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def scsrelput(**kwargs) -> None:
"""
Issue relative increments to current SH coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetSCS_rel?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
def bcsrelput(**kwargs):
"""
Issue relative increments to current beamline coordinate system.
The key "verbose" may be passed in kwargs with any true
value for verbose behaviour.
:param kwargs: a dict containing keys ("bx", "by", "bz")
:type kwargs: dict
:return:
:rtype:
"""
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")}
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
cmd = f"{base}/targetBCS_rel?{thing}"
if kwargs.get("verbose", False):
print(cmd)
put(cmd)
# url_redis = f"{beamline}-cons-705.psi.ch"
# print(f"connecting to redis DB #3 on host: {url_redis}")
# redis_handle = redis.StrictRedis(host=url_redis, db=3)
# pubsub = redis_handle.pubsub()
MODE_UNINITIALIZED = 0
MODE_INITIALIZING = 1
MODE_READY = 2
MODE_ERROR = 99
class SmarGon(object):
def __init__(self):
super(SmarGon, self).__init__()
self.__dict__.update(target=None)
self.__dict__.update(bookmarks={})
self.__dict__.update(_latest_message={})
# pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS})
# pubsub.run_in_thread(sleep_time=0.5, daemon=True)
def __repr__(self):
BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values()
return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>"
def _cb_readbackSCS(self, msg):
if msg["data"] in ["hset"]:
self._latest_message = msg
def move_home(self, wait=False) -> None:
"""move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0"""
self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0})
if wait:
self.wait_home()
def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None:
"""
Move smargon in absolute beamline coordinates
:param coords: a tuple of floats representing X, Y, Z coordinates
:type coords:
:param wait:
:type wait:
:return:
:rtype:
"""
x, y, z = coords
# the two steps below are necessary otherwise the control system
# remembers *a* previous CHI
bcs = self.bcs
bcs.update({"BX": x, "BY": y, "BZ": z})
self.bcs = bcs
if wait:
self.wait()
def wait_home(self, timeout: float = 20.0) -> None:
"""
wait for the smargon to reach its home position:
SHX = 0.0
SHY = 0.0
SHZ = 18.0
CHI = 0.0
PHI = 0.0
:param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached
:type timeout: float
:return:
:rtype:
"""
tout = timeout + time()
in_place = [False, False]
rbv = -999.0
while not all(in_place) and time() < tout:
rbv = self.readback_scs()
in_place = []
for k, v in {
"SHX": 0.0,
"SHY": 0.0,
"SHZ": 18.0,
"CHI": 0.0,
"PHI": 0.0,
}.items():
in_place.append(abs(rbv[k] - v) < 0.01)
if time() > tout:
raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
def push_bookmark(self):
"""
save current absolute coordinates in FIFO stack
:return:
:rtype:
"""
t = round(time())
self.bookmarks[t] = self.readback_scs()
def pop_bookmark(self):
return self.bookmarks.popitem()[1]
def apply_bookmark_sh(self, scs):
scsput(**scs)
def apply_last_bookmark_sh(self):
scs = self.pop_bookmark()
scsput(**scs)
def readback_mcs(self):
"""current motor positions of the smargon sliders"""
return gonget("readbackMCS")
def readback_scs(self):
"""current SH coordinates of the smargon model"""
return gonget("readbackSCS")
def readback_bcs(self):
"""current beamline coordinates of the smargon"""
return gonget("readbackBCS")
def target_scs(self):
"""currently assigned targets for the smargon control system"""
return gonget("targetSCS")
def initialize(self):
"""initialize the smargon"""
self.set_mode(MODE_UNINITIALIZED)
sleep(0.1)
self.set_mode(MODE_INITIALIZING)
def set_mode(self, mode: int):
"""put smargon control system in a given mode
MODE_UNINITIALIZED = 0
MODE_INITIALIZING = 1
MODE_READY = 2
MODE_ERROR = 99
"""
gonput(f"mode?mode={mode}")
def enable_correction(self):
"""enable calibration based corrections"""
gonput("corr_type?corr_type=1")
def disable_correction(self):
"""disable calibration based corrections"""
gonput("corr_type?corr_type=0")
def chi(self, val=None, wait=False):
if val is None:
return self.readback_scs()["CHI"]
scsput(CHI=val)
if wait:
timeout = 10 + time()
while time() < timeout:
if abs(val - self.readback_scs()["CHI"]) < 0.1:
break
if time() > timeout:
raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time")
def phi(self, val=None, wait=False):
if val is None:
return self.readback_scs()["PHI"]
scsput(PHI=val)
if wait:
timeout = 70 + time()
while time() < timeout:
if abs(val - self.readback_scs()["PHI"]) < 0.1:
break
if time() > timeout:
raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time")
def wait(self, timeout=60.0):
"""waits up to `timeout` seconds for smargon to reach target"""
target = {
k.upper(): v for k, v in self.target_scs().items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
}
timeout = timeout + time()
while time() < timeout:
s = {
k: (abs(v - target[k]) < 0.01)
for k, v in self.readback_scs().items()
if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI")
}
if all(list(s.values())):
break
if time() > timeout:
raise TimeoutError("timed out waiting for smargon to reach target")
def __setattr__(self, key, value):
key = key.lower()
if key == "mode":
self.set_mode(value)
elif key == "correction":
assert value in (
0,
1,
False,
True,
), "correction is either 1 or True (enabled) or 0 (disabled)"
gonput(f"corr_type?corr_type?{value}")
elif key == "scs":
scsput(**value)
elif key == "bcs":
bcsput(**value)
elif key == "target":
if not isinstance(value, dict):
raise Exception(f"expected a dict with target axis and values got something else: {value}")
for k in value.keys():
if k.lower() not in "shx shy shz chi phi ox oy oz".split():
raise Exception(f'unknown axis in target "{k}"')
scsput(**value)
elif key in "shx shy shz chi phi ox oy oz".split():
scsput(**{key: value})
elif key in "bx by bz".split():
bcs = self.readback_bcs()
bcs[key] = value
bcsput(**bcs)
else:
self.__dict__[key].update(value)
def __getattr__(self, key):
key = key.lower()
if key == "mode":
return self.readback_mcs()["mode"]
elif key == "correction":
return gonget("corr_type")
elif key == "bcs":
return self.readback_bcs()
elif key == "mcs":
return self.readback_mcs()
elif key == "scs":
return self.readback_scs()
elif key in "shx shy shz chi phi ox oy oz".split():
return self.readback_scs()[key.upper()]
elif key in "bx by bz".split():
return self.readback_bcs()[key.upper()]
else:
return self.__getattribute__(key)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="SmarGon client")
parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true")
args = parser.parse_args()
smargon = SmarGon()
if args.initialize:
print("initializing smargon device")
import Aerotech
print("moving aerotech back by 50mm")
abr = Aerotech.Abr()
abr.incr_x(-50.0, wait=True, velo=100.0)
print("issuing init command to smargon")
smargon.initialize()
sleep(0.5)
print("waiting for init routine to complete")
while MODE_READY != smargon.mode:
sleep(0.5)
print("moving smargon to HOME position")
smargon.move_home()
print("moving aerotech to its previous position")
abr.incr_x(50.0, wait=True, velo=100.0)
exit(0)