diff --git a/pxiii_bec/devices/SmarGon.py b/pxiii_bec/devices/SmarGon.py new file mode 100644 index 0000000..aaa2b34 --- /dev/null +++ b/pxiii_bec/devices/SmarGon.py @@ -0,0 +1,94 @@ +import time +from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Device, Signal +from ophyd.status import SubscriptionStatus + +import requests + +try: + from bec_lib import bec_logger + logger = bec_logger.logger +except ModuleNotFoundError: + import logging + logger = logging.getLogger("A3200") + + +class SmarGonSignal(Signal): + """Small helper class to read PVs that need to be processed first.""" + + def __init__(self, group, addr, *args, **kwargs): + super().__init__(*args, **kwargs) + self.group = group + self.addr = addr + # self.get() + + def put(self, value, *args, **kwargs): + r = self.parent._go_n_put(f"target{self.group}?{self.addr}={value}") + print(r) + return super().put(*args, **kwargs) + + def get(self, *args, **kwargs): + r = self.parent._go_n_get(f"target{self.group}?{self.addr}") + print(r) + return super().get(*args, **kwargs) + + +class SmarGonClient(Device): + """SmarGon client deice + + This class controls the SmarGon goniometer via the REST interface. + """ + # pylint: disable=too-many-instance-attributes + USER_ACCESS = ["set_daq_config", "get_daq_config", "nuke", "connect", "message", "state", "bluestage", "blueunstage"] + + # Status attributes + sg_url = Component(Signal, kind=Kind.config, metadata={'write_access': False}) + + # Axis parameters + shx = Component(SmarGonSignal, group="SCS", name="shx", kind=Kind.config) + # shy = Component(SmarGonSignal, group="SCS", name="shy", kind=Kind.config) + # shz = Component(SmarGonSignal, group="SCS", name="shz", kind=Kind.config) + # chi = Component(SmarGonSignal, group="SCS", name="chi", kind=Kind.config) + # phi = Component(SmarGonSignal, group="SCS", name="phi", kind=Kind.config) + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + device_manager=None, + sg_url: str = "http://x06da-smargopolo.psi.ch:3000", + **kwargs, + ) -> None: + # super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) + + super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) + self.sg_url._metadata["write_access"] = False + self.sg_url.set(sg_url, force=True).wait() + + def _go_n_get(self, name, **kwargs): + cmd = f"{self.sg_url.get()}/{name}" + r = requests.get(cmd, timeout=1) + if not r.ok: + raise RuntimeError(f"[self.name] Error getting {name}; server returned {r.status_code} => {r.reason}") + return r.json() + + def _go_n_put(self, name, **kwargs): + cmd = f"{self.sg_url.get()}/{name}" + r = requests.put(cmd, timeout=1) + if not r.ok: + raise RuntimeError(f"[self.name] Error putting {name}; server returned {r.status_code} => {r.reason}") + + + + + +if __name__ == "__main__": + sg = SmarGonClient(prefix="X06DA-ES", name="smargon") + sg.wait_for_connection() + + + diff --git a/pxiii_bec/devices/SmarGon_orig.py b/pxiii_bec/devices/SmarGon_orig.py new file mode 100644 index 0000000..bde885f --- /dev/null +++ b/pxiii_bec/devices/SmarGon_orig.py @@ -0,0 +1,391 @@ +#!/usr/bin/env python3 + +from time import sleep, time +from typing import Tuple + +from requests import get, put + +from beamline import beamline +from mx_redis import SMARGON + +try: + from mx_preferences import get_config + + host = get_config(beamline)["smargon"]["host"] + port = get_config(beamline)["smargon"]["port"] +except Exception: + host = "x06da-smargopolo.psi.ch" + port = 3000 +base = f"http://{host}:{port}" + + +def gonget(thing: str, **kwargs) -> dict: + """issue a GET for some API component on the smargopolo server""" + cmd = f"{base}/{thing}" + if kwargs.get("verbose", False): + print(cmd) + r = get(cmd) + if not r.ok: + raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}") + return r.json() + + +def gonput(thing: str, **kwargs): + """issue a PUT for some API component on the smargopolo server""" + cmd = f"{base}/{thing}" + if kwargs.get("verbose", False): + print(cmd) + put(cmd) + + +def scsput(**kwargs): + """ + Issue a new absolute target in the SH coordinate system. + + The key "verbose" may be passed in kwargs with any true + value for verbose behaviour. + + + :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi") + :type kwargs: dict + :return: + :rtype: + """ + xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")} + thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) + cmd = f"{base}/targetSCS?{thing}" + if kwargs.get("verbose", False): + print(cmd) + put(cmd) + + +def bcsput(**kwargs): + """ + Issue a new absolute target in the beamline coordinate system. + + The key "verbose" may be passed in kwargs with any true + value for verbose behaviour. + + + :param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi") + :return: + :rtype: + """ + xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")} + thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) + cmd = f"{base}/targetBCS?{thing}" + if kwargs.get("verbose", False): + print(cmd) + put(cmd) + + +def scsrelput(**kwargs) -> None: + """ + Issue relative increments to current SH coordinate system. + + The key "verbose" may be passed in kwargs with any true + value for verbose behaviour. + + + :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi") + :type kwargs: dict + :return: + :rtype: + """ + xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")} + thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) + cmd = f"{base}/targetSCS_rel?{thing}" + if kwargs.get("verbose", False): + print(cmd) + put(cmd) + + +def bcsrelput(**kwargs): + """ + Issue relative increments to current beamline coordinate system. + + The key "verbose" may be passed in kwargs with any true + value for verbose behaviour. + + :param kwargs: a dict containing keys ("bx", "by", "bz") + :type kwargs: dict + :return: + :rtype: + """ + xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")} + thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()]) + cmd = f"{base}/targetBCS_rel?{thing}" + if kwargs.get("verbose", False): + print(cmd) + put(cmd) + + +# url_redis = f"{beamline}-cons-705.psi.ch" +# print(f"connecting to redis DB #3 on host: {url_redis}") +# redis_handle = redis.StrictRedis(host=url_redis, db=3) +# pubsub = redis_handle.pubsub() + +MODE_UNINITIALIZED = 0 +MODE_INITIALIZING = 1 +MODE_READY = 2 +MODE_ERROR = 99 + + +class SmarGon(object): + def __init__(self): + super(SmarGon, self).__init__() + self.__dict__.update(target=None) + self.__dict__.update(bookmarks={}) + self.__dict__.update(_latest_message={}) + # pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS}) + # pubsub.run_in_thread(sleep_time=0.5, daemon=True) + + def __repr__(self): + BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values() + return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>" + + def _cb_readbackSCS(self, msg): + if msg["data"] in ["hset"]: + self._latest_message = msg + + def move_home(self, wait=False) -> None: + """move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0""" + self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0}) + if wait: + self.wait_home() + + def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None: + """ + Move smargon in absolute beamline coordinates + + :param coords: a tuple of floats representing X, Y, Z coordinates + :type coords: + :param wait: + :type wait: + :return: + :rtype: + """ + x, y, z = coords + # the two steps below are necessary otherwise the control system + # remembers *a* previous CHI + bcs = self.bcs + bcs.update({"BX": x, "BY": y, "BZ": z}) + self.bcs = bcs + if wait: + self.wait() + + def wait_home(self, timeout: float = 20.0) -> None: + """ + wait for the smargon to reach its home position: + SHX = 0.0 + SHY = 0.0 + SHZ = 18.0 + CHI = 0.0 + PHI = 0.0 + + :param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached + :type timeout: float + :return: + :rtype: + """ + tout = timeout + time() + in_place = [False, False] + rbv = -999.0 + while not all(in_place) and time() < tout: + rbv = self.readback_scs() + in_place = [] + for k, v in { + "SHX": 0.0, + "SHY": 0.0, + "SHZ": 18.0, + "CHI": 0.0, + "PHI": 0.0, + }.items(): + in_place.append(abs(rbv[k] - v) < 0.01) + if time() > tout: + raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}") + + def push_bookmark(self): + """ + save current absolute coordinates in FIFO stack + :return: + :rtype: + """ + t = round(time()) + self.bookmarks[t] = self.readback_scs() + + def pop_bookmark(self): + return self.bookmarks.popitem()[1] + + def apply_bookmark_sh(self, scs): + scsput(**scs) + + def apply_last_bookmark_sh(self): + scs = self.pop_bookmark() + scsput(**scs) + + def readback_mcs(self): + """current motor positions of the smargon sliders""" + return gonget("readbackMCS") + + def readback_scs(self): + """current SH coordinates of the smargon model""" + return gonget("readbackSCS") + + def readback_bcs(self): + """current beamline coordinates of the smargon""" + return gonget("readbackBCS") + + def target_scs(self): + """currently assigned targets for the smargon control system""" + return gonget("targetSCS") + + def initialize(self): + """initialize the smargon""" + self.set_mode(MODE_UNINITIALIZED) + sleep(0.1) + self.set_mode(MODE_INITIALIZING) + + def set_mode(self, mode: int): + """put smargon control system in a given mode + MODE_UNINITIALIZED = 0 + MODE_INITIALIZING = 1 + MODE_READY = 2 + MODE_ERROR = 99 + """ + gonput(f"mode?mode={mode}") + + def enable_correction(self): + """enable calibration based corrections""" + gonput("corr_type?corr_type=1") + + def disable_correction(self): + """disable calibration based corrections""" + gonput("corr_type?corr_type=0") + + def chi(self, val=None, wait=False): + if val is None: + return self.readback_scs()["CHI"] + scsput(CHI=val) + if wait: + timeout = 10 + time() + while time() < timeout: + if abs(val - self.readback_scs()["CHI"]) < 0.1: + break + if time() > timeout: + raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time") + + def phi(self, val=None, wait=False): + if val is None: + return self.readback_scs()["PHI"] + scsput(PHI=val) + if wait: + timeout = 70 + time() + while time() < timeout: + if abs(val - self.readback_scs()["PHI"]) < 0.1: + break + if time() > timeout: + raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time") + + def wait(self, timeout=60.0): + """waits up to `timeout` seconds for smargon to reach target""" + target = { + k.upper(): v for k, v in self.target_scs().items() if k.lower() in ("shx", "shy", "shz", "chi", "phi") + } + + timeout = timeout + time() + while time() < timeout: + s = { + k: (abs(v - target[k]) < 0.01) + for k, v in self.readback_scs().items() + if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI") + } + if all(list(s.values())): + break + if time() > timeout: + raise TimeoutError("timed out waiting for smargon to reach target") + + def __setattr__(self, key, value): + key = key.lower() + if key == "mode": + self.set_mode(value) + elif key == "correction": + assert value in ( + 0, + 1, + False, + True, + ), "correction is either 1 or True (enabled) or 0 (disabled)" + gonput(f"corr_type?corr_type?{value}") + elif key == "scs": + scsput(**value) + elif key == "bcs": + bcsput(**value) + elif key == "target": + if not isinstance(value, dict): + raise Exception(f"expected a dict with target axis and values got something else: {value}") + for k in value.keys(): + if k.lower() not in "shx shy shz chi phi ox oy oz".split(): + raise Exception(f'unknown axis in target "{k}"') + scsput(**value) + elif key in "shx shy shz chi phi ox oy oz".split(): + scsput(**{key: value}) + elif key in "bx by bz".split(): + bcs = self.readback_bcs() + bcs[key] = value + bcsput(**bcs) + else: + self.__dict__[key].update(value) + + def __getattr__(self, key): + key = key.lower() + if key == "mode": + return self.readback_mcs()["mode"] + elif key == "correction": + return gonget("corr_type") + elif key == "bcs": + return self.readback_bcs() + elif key == "mcs": + return self.readback_mcs() + elif key == "scs": + return self.readback_scs() + elif key in "shx shy shz chi phi ox oy oz".split(): + return self.readback_scs()[key.upper()] + elif key in "bx by bz".split(): + return self.readback_bcs()[key.upper()] + else: + return self.__getattribute__(key) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="SmarGon client") + parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true") + args = parser.parse_args() + + smargon = SmarGon() + + if args.initialize: + print("initializing smargon device") + import Aerotech + + print("moving aerotech back by 50mm") + abr = Aerotech.Abr() + + abr.incr_x(-50.0, wait=True, velo=100.0) + + print("issuing init command to smargon") + smargon.initialize() + sleep(0.5) + + print("waiting for init routine to complete") + while MODE_READY != smargon.mode: + sleep(0.5) + + print("moving smargon to HOME position") + smargon.move_home() + + print("moving aerotech to its previous position") + abr.incr_x(50.0, wait=True, velo=100.0) + exit(0)