First tries with ABR
This commit is contained in:
94
pxiii_bec/devices/SmarGon.py
Normal file
94
pxiii_bec/devices/SmarGon.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import time
|
||||
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
import requests
|
||||
|
||||
try:
|
||||
from bec_lib import bec_logger
|
||||
logger = bec_logger.logger
|
||||
except ModuleNotFoundError:
|
||||
import logging
|
||||
logger = logging.getLogger("A3200")
|
||||
|
||||
|
||||
class SmarGonSignal(Signal):
|
||||
"""Small helper class to read PVs that need to be processed first."""
|
||||
|
||||
def __init__(self, group, addr, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.group = group
|
||||
self.addr = addr
|
||||
# self.get()
|
||||
|
||||
def put(self, value, *args, **kwargs):
|
||||
r = self.parent._go_n_put(f"target{self.group}?{self.addr}={value}")
|
||||
print(r)
|
||||
return super().put(*args, **kwargs)
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
r = self.parent._go_n_get(f"target{self.group}?{self.addr}")
|
||||
print(r)
|
||||
return super().get(*args, **kwargs)
|
||||
|
||||
|
||||
class SmarGonClient(Device):
|
||||
"""SmarGon client deice
|
||||
|
||||
This class controls the SmarGon goniometer via the REST interface.
|
||||
"""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"]
|
||||
|
||||
# Status attributes
|
||||
sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
|
||||
|
||||
# Axis parameters
|
||||
shx = Component(SmarGonSignal, group="SCS", name="shx", kind=Kind.config)
|
||||
# shy = Component(SmarGonSignal, group="SCS", name="shy", kind=Kind.config)
|
||||
# shz = Component(SmarGonSignal, group="SCS", name="shz", kind=Kind.config)
|
||||
# chi = Component(SmarGonSignal, group="SCS", name="chi", kind=Kind.config)
|
||||
# phi = Component(SmarGonSignal, group="SCS", name="phi", kind=Kind.config)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
device_manager=None,
|
||||
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
|
||||
**kwargs,
|
||||
) -> None:
|
||||
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
|
||||
|
||||
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
|
||||
self.sg_url._metadata["write_access"] = False
|
||||
self.sg_url.set(sg_url, force=True).wait()
|
||||
|
||||
def _go_n_get(self, name, **kwargs):
|
||||
cmd = f"{self.sg_url.get()}/{name}"
|
||||
r = requests.get(cmd, timeout=1)
|
||||
if not r.ok:
|
||||
raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}")
|
||||
return r.json()
|
||||
|
||||
def _go_n_put(self, name, **kwargs):
|
||||
cmd = f"{self.sg_url.get()}/{name}"
|
||||
r = requests.put(cmd, timeout=1)
|
||||
if not r.ok:
|
||||
raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sg = SmarGonClient(prefix="X06DA-ES", name="smargon")
|
||||
sg.wait_for_connection()
|
||||
|
||||
|
||||
|
||||
391
pxiii_bec/devices/SmarGon_orig.py
Normal file
391
pxiii_bec/devices/SmarGon_orig.py
Normal file
@@ -0,0 +1,391 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from time import sleep, time
|
||||
from typing import Tuple
|
||||
|
||||
from requests import get, put
|
||||
|
||||
from beamline import beamline
|
||||
from mx_redis import SMARGON
|
||||
|
||||
try:
|
||||
from mx_preferences import get_config
|
||||
|
||||
host = get_config(beamline)["smargon"]["host"]
|
||||
port = get_config(beamline)["smargon"]["port"]
|
||||
except Exception:
|
||||
host = "x06da-smargopolo.psi.ch"
|
||||
port = 3000
|
||||
base = f"http://{host}:{port}"
|
||||
|
||||
|
||||
def gonget(thing: str, **kwargs) -> dict:
|
||||
"""issue a GET for some API component on the smargopolo server"""
|
||||
cmd = f"{base}/{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
r = get(cmd)
|
||||
if not r.ok:
|
||||
raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}")
|
||||
return r.json()
|
||||
|
||||
|
||||
def gonput(thing: str, **kwargs):
|
||||
"""issue a PUT for some API component on the smargopolo server"""
|
||||
cmd = f"{base}/{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
put(cmd)
|
||||
|
||||
|
||||
def scsput(**kwargs):
|
||||
"""
|
||||
Issue a new absolute target in the SH coordinate system.
|
||||
|
||||
The key "verbose" may be passed in kwargs with any true
|
||||
value for verbose behaviour.
|
||||
|
||||
|
||||
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
|
||||
:type kwargs: dict
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
|
||||
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
|
||||
cmd = f"{base}/targetSCS?{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
put(cmd)
|
||||
|
||||
|
||||
def bcsput(**kwargs):
|
||||
"""
|
||||
Issue a new absolute target in the beamline coordinate system.
|
||||
|
||||
The key "verbose" may be passed in kwargs with any true
|
||||
value for verbose behaviour.
|
||||
|
||||
|
||||
:param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi")
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")}
|
||||
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
|
||||
cmd = f"{base}/targetBCS?{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
put(cmd)
|
||||
|
||||
|
||||
def scsrelput(**kwargs) -> None:
|
||||
"""
|
||||
Issue relative increments to current SH coordinate system.
|
||||
|
||||
The key "verbose" may be passed in kwargs with any true
|
||||
value for verbose behaviour.
|
||||
|
||||
|
||||
:param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
|
||||
:type kwargs: dict
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")}
|
||||
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
|
||||
cmd = f"{base}/targetSCS_rel?{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
put(cmd)
|
||||
|
||||
|
||||
def bcsrelput(**kwargs):
|
||||
"""
|
||||
Issue relative increments to current beamline coordinate system.
|
||||
|
||||
The key "verbose" may be passed in kwargs with any true
|
||||
value for verbose behaviour.
|
||||
|
||||
:param kwargs: a dict containing keys ("bx", "by", "bz")
|
||||
:type kwargs: dict
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")}
|
||||
thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
|
||||
cmd = f"{base}/targetBCS_rel?{thing}"
|
||||
if kwargs.get("verbose", False):
|
||||
print(cmd)
|
||||
put(cmd)
|
||||
|
||||
|
||||
# url_redis = f"{beamline}-cons-705.psi.ch"
|
||||
# print(f"connecting to redis DB #3 on host: {url_redis}")
|
||||
# redis_handle = redis.StrictRedis(host=url_redis, db=3)
|
||||
# pubsub = redis_handle.pubsub()
|
||||
|
||||
MODE_UNINITIALIZED = 0
|
||||
MODE_INITIALIZING = 1
|
||||
MODE_READY = 2
|
||||
MODE_ERROR = 99
|
||||
|
||||
|
||||
class SmarGon(object):
|
||||
def __init__(self):
|
||||
super(SmarGon, self).__init__()
|
||||
self.__dict__.update(target=None)
|
||||
self.__dict__.update(bookmarks={})
|
||||
self.__dict__.update(_latest_message={})
|
||||
# pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS})
|
||||
# pubsub.run_in_thread(sleep_time=0.5, daemon=True)
|
||||
|
||||
def __repr__(self):
|
||||
BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values()
|
||||
return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>"
|
||||
|
||||
def _cb_readbackSCS(self, msg):
|
||||
if msg["data"] in ["hset"]:
|
||||
self._latest_message = msg
|
||||
|
||||
def move_home(self, wait=False) -> None:
|
||||
"""move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0"""
|
||||
self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0})
|
||||
if wait:
|
||||
self.wait_home()
|
||||
|
||||
def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None:
|
||||
"""
|
||||
Move smargon in absolute beamline coordinates
|
||||
|
||||
:param coords: a tuple of floats representing X, Y, Z coordinates
|
||||
:type coords:
|
||||
:param wait:
|
||||
:type wait:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
x, y, z = coords
|
||||
# the two steps below are necessary otherwise the control system
|
||||
# remembers *a* previous CHI
|
||||
bcs = self.bcs
|
||||
bcs.update({"BX": x, "BY": y, "BZ": z})
|
||||
self.bcs = bcs
|
||||
if wait:
|
||||
self.wait()
|
||||
|
||||
def wait_home(self, timeout: float = 20.0) -> None:
|
||||
"""
|
||||
wait for the smargon to reach its home position:
|
||||
SHX = 0.0
|
||||
SHY = 0.0
|
||||
SHZ = 18.0
|
||||
CHI = 0.0
|
||||
PHI = 0.0
|
||||
|
||||
:param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached
|
||||
:type timeout: float
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
tout = timeout + time()
|
||||
in_place = [False, False]
|
||||
rbv = -999.0
|
||||
while not all(in_place) and time() < tout:
|
||||
rbv = self.readback_scs()
|
||||
in_place = []
|
||||
for k, v in {
|
||||
"SHX": 0.0,
|
||||
"SHY": 0.0,
|
||||
"SHZ": 18.0,
|
||||
"CHI": 0.0,
|
||||
"PHI": 0.0,
|
||||
}.items():
|
||||
in_place.append(abs(rbv[k] - v) < 0.01)
|
||||
if time() > tout:
|
||||
raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
|
||||
|
||||
def push_bookmark(self):
|
||||
"""
|
||||
save current absolute coordinates in FIFO stack
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
t = round(time())
|
||||
self.bookmarks[t] = self.readback_scs()
|
||||
|
||||
def pop_bookmark(self):
|
||||
return self.bookmarks.popitem()[1]
|
||||
|
||||
def apply_bookmark_sh(self, scs):
|
||||
scsput(**scs)
|
||||
|
||||
def apply_last_bookmark_sh(self):
|
||||
scs = self.pop_bookmark()
|
||||
scsput(**scs)
|
||||
|
||||
def readback_mcs(self):
|
||||
"""current motor positions of the smargon sliders"""
|
||||
return gonget("readbackMCS")
|
||||
|
||||
def readback_scs(self):
|
||||
"""current SH coordinates of the smargon model"""
|
||||
return gonget("readbackSCS")
|
||||
|
||||
def readback_bcs(self):
|
||||
"""current beamline coordinates of the smargon"""
|
||||
return gonget("readbackBCS")
|
||||
|
||||
def target_scs(self):
|
||||
"""currently assigned targets for the smargon control system"""
|
||||
return gonget("targetSCS")
|
||||
|
||||
def initialize(self):
|
||||
"""initialize the smargon"""
|
||||
self.set_mode(MODE_UNINITIALIZED)
|
||||
sleep(0.1)
|
||||
self.set_mode(MODE_INITIALIZING)
|
||||
|
||||
def set_mode(self, mode: int):
|
||||
"""put smargon control system in a given mode
|
||||
MODE_UNINITIALIZED = 0
|
||||
MODE_INITIALIZING = 1
|
||||
MODE_READY = 2
|
||||
MODE_ERROR = 99
|
||||
"""
|
||||
gonput(f"mode?mode={mode}")
|
||||
|
||||
def enable_correction(self):
|
||||
"""enable calibration based corrections"""
|
||||
gonput("corr_type?corr_type=1")
|
||||
|
||||
def disable_correction(self):
|
||||
"""disable calibration based corrections"""
|
||||
gonput("corr_type?corr_type=0")
|
||||
|
||||
def chi(self, val=None, wait=False):
|
||||
if val is None:
|
||||
return self.readback_scs()["CHI"]
|
||||
scsput(CHI=val)
|
||||
if wait:
|
||||
timeout = 10 + time()
|
||||
while time() < timeout:
|
||||
if abs(val - self.readback_scs()["CHI"]) < 0.1:
|
||||
break
|
||||
if time() > timeout:
|
||||
raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time")
|
||||
|
||||
def phi(self, val=None, wait=False):
|
||||
if val is None:
|
||||
return self.readback_scs()["PHI"]
|
||||
scsput(PHI=val)
|
||||
if wait:
|
||||
timeout = 70 + time()
|
||||
while time() < timeout:
|
||||
if abs(val - self.readback_scs()["PHI"]) < 0.1:
|
||||
break
|
||||
if time() > timeout:
|
||||
raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time")
|
||||
|
||||
def wait(self, timeout=60.0):
|
||||
"""waits up to `timeout` seconds for smargon to reach target"""
|
||||
target = {
|
||||
k.upper(): v for k, v in self.target_scs().items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
|
||||
}
|
||||
|
||||
timeout = timeout + time()
|
||||
while time() < timeout:
|
||||
s = {
|
||||
k: (abs(v - target[k]) < 0.01)
|
||||
for k, v in self.readback_scs().items()
|
||||
if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI")
|
||||
}
|
||||
if all(list(s.values())):
|
||||
break
|
||||
if time() > timeout:
|
||||
raise TimeoutError("timed out waiting for smargon to reach target")
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
key = key.lower()
|
||||
if key == "mode":
|
||||
self.set_mode(value)
|
||||
elif key == "correction":
|
||||
assert value in (
|
||||
0,
|
||||
1,
|
||||
False,
|
||||
True,
|
||||
), "correction is either 1 or True (enabled) or 0 (disabled)"
|
||||
gonput(f"corr_type?corr_type?{value}")
|
||||
elif key == "scs":
|
||||
scsput(**value)
|
||||
elif key == "bcs":
|
||||
bcsput(**value)
|
||||
elif key == "target":
|
||||
if not isinstance(value, dict):
|
||||
raise Exception(f"expected a dict with target axis and values got something else: {value}")
|
||||
for k in value.keys():
|
||||
if k.lower() not in "shx shy shz chi phi ox oy oz".split():
|
||||
raise Exception(f'unknown axis in target "{k}"')
|
||||
scsput(**value)
|
||||
elif key in "shx shy shz chi phi ox oy oz".split():
|
||||
scsput(**{key: value})
|
||||
elif key in "bx by bz".split():
|
||||
bcs = self.readback_bcs()
|
||||
bcs[key] = value
|
||||
bcsput(**bcs)
|
||||
else:
|
||||
self.__dict__[key].update(value)
|
||||
|
||||
def __getattr__(self, key):
|
||||
key = key.lower()
|
||||
if key == "mode":
|
||||
return self.readback_mcs()["mode"]
|
||||
elif key == "correction":
|
||||
return gonget("corr_type")
|
||||
elif key == "bcs":
|
||||
return self.readback_bcs()
|
||||
elif key == "mcs":
|
||||
return self.readback_mcs()
|
||||
elif key == "scs":
|
||||
return self.readback_scs()
|
||||
elif key in "shx shy shz chi phi ox oy oz".split():
|
||||
return self.readback_scs()[key.upper()]
|
||||
elif key in "bx by bz".split():
|
||||
return self.readback_bcs()[key.upper()]
|
||||
else:
|
||||
return self.__getattribute__(key)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="SmarGon client")
|
||||
parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
smargon = SmarGon()
|
||||
|
||||
if args.initialize:
|
||||
print("initializing smargon device")
|
||||
import Aerotech
|
||||
|
||||
print("moving aerotech back by 50mm")
|
||||
abr = Aerotech.Abr()
|
||||
|
||||
abr.incr_x(-50.0, wait=True, velo=100.0)
|
||||
|
||||
print("issuing init command to smargon")
|
||||
smargon.initialize()
|
||||
sleep(0.5)
|
||||
|
||||
print("waiting for init routine to complete")
|
||||
while MODE_READY != smargon.mode:
|
||||
sleep(0.5)
|
||||
|
||||
print("moving smargon to HOME position")
|
||||
smargon.move_home()
|
||||
|
||||
print("moving aerotech to its previous position")
|
||||
abr.incr_x(50.0, wait=True, velo=100.0)
|
||||
exit(0)
|
||||
Reference in New Issue
Block a user