More stuff added to assembly, new MotorRecord.

This commit is contained in:
2020-10-06 23:48:58 +02:00
parent b8cb68164d
commit 5d83c86cd2
7 changed files with 429 additions and 20 deletions
+3 -2
View File
@@ -55,10 +55,11 @@ class Alias:
name = [self.alias]
parent = self.parent
while not parent == None:
if (parent is base) or (parent is None):
break
name.append(parent.alias)
parent = parent.__dict__.get("parent", None)
if parent is base:
break
if joiner:
return joiner.join(reversed(name))
else:
+9
View File
@@ -358,6 +358,15 @@ components = [
"type": "eco.endstations.bernina_diffractometers:XRD",
"kwargs": {"Id": "SARES21-XRD", "configuration": config["xrd_config"]},
},
{
"args": [],
"name": "xrd_new",
"z_und": 142,
"desc": "Xray diffractometer",
"type": "eco.endstations.bernina_diffractometers:XRD_new",
"kwargs": {"Id": "SARES21-XRD", "configuration": config["xrd_config"]},
"lazy":False,
},
{
"args": [],
"name": "vonHamos",
+25
View File
@@ -421,6 +421,31 @@ class PvEnum:
s += "{:>4} {} {}\n".format(val, sel, name)
return s
class PvString:
def __init__(self, pvname, name=None, elog=None):
self.name = name
self.pvname = pvname
self._pv = PV(pvname)
self._elog = elog
self.alias = Alias(name, channel=self.pvname, channeltype="CA")
def get_current_value(self):
return self._pv.get()
def set_target_value(self, value, hold=False):
changer = lambda value: self._pv.put(bytes(value, "utf8"), wait=True)
return Changer(
target=value, parent=self, changer=changer, hold=hold, stopper=None
)
def __repr__(self):
return self.get_current_value()
def __call__(self, string=None):
if not string is None:
self.set_target_value(string)
else:
return self.get_current_value()
@default_representation
@spec_convenience
+16 -16
View File
@@ -3,56 +3,56 @@ from ..devices_general.adjustable import PvRecord
class MforceChannel(Assembly):
def __init__(self, pv_base=None):
def __init__(self, pv_base, port):
self.pv_base = pv_base # Example SARES20-MF1:
self.motor_nb = motor_nb # Example 15
self.port = port # Example 15
self._append(
PvRecord,
self.pv_base + self.motor_nb + "_RC",
self.pv_base + self.port + "_RC",
name="no_idea_what_this_is",
is_setting=True,
) # 8
)
self._append(
PvRecord,
self.pv_base + "MOT_" + self.motor_nb + ".DESC",
self.pv_base + "MOT_" + self.port + ".DESC",
name="display_name",
is_setting=True,
) # X-ray Eye
)
self._append(
PvRecord,
self.pv_base + "MOT_" + self.motor_nb + ".EGU",
self.pv_base + "MOT_" + self.port + ".EGU",
name="units",
is_setting=True,
) # %
)
self._append(
PvRecord,
self.pv_base + "MOT_" + self.motor_nb + ".MRES",
self.pv_base + "MOT_" + self.port + ".MRES",
name="motor_resolution",
is_setting=True,
) # 0.00002886
)
self._append(
PvRecord,
self.pv_base + "MOT_" + self.motor_nb + ".ERES",
self.pv_base + "MOT_" + self.port + ".ERES",
name="encoder_resolution",
is_setting=True,
) # 0.00002886
)
self._append(
PvRecord,
self.pv_base + "MOT_" + self.motor_nb + ".VELO",
self.pv_base + "MOT_" + self.port + ".VELO",
name="velocity",
is_setting=True,
) # 10
)
self._append(
PvRecord,
self.pv_base + self.motor_nb + "_set",
self.pv_base + self.port + "_set",
name="limit_switch_I",
is_setting=True,
) # IS=1,2,0
# IS=1,3,0 set wire 1 (1,3) to high limit, active when at 0
self._append(
PvRecord,
self.pv_base + self.motor_nb + "_set",
self.pv_base + self.port + "_set",
name="limit_switch_II",
is_setting=True,
) # IS=2,3,0
+232
View File
@@ -11,6 +11,8 @@ import colorama
from ..utilities.KeyPress import KeyPress
import sys, colorama
from .. import global_config
from ..elements.assembly import Assembly
from .adjustable import PvRecord, PvEnum, PvString
if hasattr(global_config, "elog"):
elog = global_config.elog
@@ -50,6 +52,236 @@ def _keywordChecker(kw_key_list_tups):
assert tkey in tlist, "Keyword %s should be one of %s" % (tkw, tlist)
@spec_convenience
@update_changes
class MotorRecord_new(Assembly):
def __init__(
self,
pvname,
name=None,
elog=None,
alias_fields={"readback": "RBV", "user_offset": "OFF"},
):
super().__init__(name=name)
self.pvname = pvname
self._motor = _Motor(pvname)
self._elog = elog
for an, af in alias_fields.items():
self.alias.append(
Alias(an, channel=".".join([pvname, af]), channeltype="CA")
)
self._currentChange = None
# self.description = EpicsString(pvname + ".DESC")
self._append(PvEnum,self.pvname+'.DIR',name='direction',is_setting=True)
self._append(PvRecord,self.pvname+'.OFF',name='offset',is_setting=True)
self._append(PvRecord,self.pvname+'.VELO',name='speed',is_setting=False)
self._append(PvRecord,self.pvname+'.ACCL',name='acceleration_time',is_setting=False)
self._append(PvRecord,self.pvname+'.LLM',name='limit_low',is_setting=False)
self._append(PvRecord,self.pvname+'.HLM',name='limit_high',is_setting=False)
self._append(PvEnum,self.pvname+'.SPMG',name='motor_state',is_setting=False)
self._append(PvString,self.pvname+'.EGU',name='unit',is_setting=False)
self._append(PvString,self.pvname+'.DESC',name='description',is_setting=False)
# Conventional methods and properties for all Adjustable objects
def set_target_value(self, value, hold=False, check=True):
""" Adjustable convention"""
def changer(value):
self._status = self._motor.move(value, ignore_limits=(not check), wait=True)
self._status_message = _status_messages[self._status]
if self._status < 0:
raise AdjustableError(self._status_message)
elif self._status > 0:
print("\n")
print(self._status_message)
# changer = lambda value: self._motor.move(\
# value, ignore_limits=(not check),
# wait=True)
return Changer(
target=value,
parent=self,
changer=changer,
hold=hold,
stopper=self._motor.stop,
)
def stop(self):
""" Adjustable convention"""
try:
self._currentChange.stop()
except:
self._motor.stop()
pass
def get_current_value(self, posType="user", readback=True):
""" Adjustable convention"""
_keywordChecker([("posType", posType, _posTypes)])
if posType == "user":
return self._motor.get_position(readback=readback)
if posType == "dial":
return self._motor.get_position(readback=readback, dial=True)
if posType == "raw":
return self._motor.get_position(readback=readback, raw=True)
def reset_current_value_to(self, value, posType="user"):
""" Adjustable convention"""
_keywordChecker([("posType", posType, _posTypes)])
if posType == "user":
return self._motor.set_position(value)
if posType == "dial":
return self._motor.set_position(value, dial=True)
if posType == "raw":
return self._motor.set_position(value, raw=True)
def get_moveDone(self):
""" Adjustable convention"""
""" 0: moving 1: move done"""
return PV(str(self.Id + ".DMOV")).value
def set_limits(
self, low_limit, high_limit, posType="user", relative_to_present=False
):
"""
set limits. usage: set_limits(low_limit, high_limit)
"""
_keywordChecker([("posType", posType, _posTypes)])
ll_name, hl_name = "LLM", "HLM"
if posType is "dial":
ll_name, hl_name = "DLLM", "DHLM"
if relative_to_present:
v = self.get_current_value(posType=posType)
low_limit = v + low_limit
high_limit = v + high_limit
self._motor.put(ll_name, low_limit)
self._motor.put(hl_name, high_limit)
def add_value_callback(self, callback, index=None):
return self._motor.get_pv("RBV").add_callback(callback=callback, index=index)
def clear_value_callback(self, index=None):
if index:
self._motor.get_pv("RBV").remove_callback(index)
else:
self._motor.get_pv("RBV").clear_callbacks()
def get_limits(self, posType="user"):
""" Adjustable convention"""
_keywordChecker([("posType", posType, _posTypes)])
ll_name, hl_name = "LLM", "HLM"
if posType is "dial":
ll_name, hl_name = "DLLM", "DHLM"
return self._motor.get(ll_name), self._motor.get(hl_name)
def gui(self, guiType="xdm"):
""" Adjustable convention"""
cmd = ["caqtdm", "-macro"]
cmd.append('"P=%s:,M=%s"' % tuple(self.Id.split(":")))
# cmd.append('/sf/common/config/qt/motorx_more.ui')
cmd.append("motorx_more.ui")
# os.system(' '.join(cmd))
return subprocess.Popen(" ".join(cmd), shell=True)
# return string with motor value as variable representation
def __str__(self):
# """ return short info for the current motor"""
s = f"{self.name}"
s += f"\t@ {colorama.Style.BRIGHT}{self.get_current_value():1.6g}{colorama.Style.RESET_ALL} (dial @ {self.get_current_value(posType='dial'):1.6g})"
# # s += "\tuser limits (low,high) : {:1.6g},{:1.6g}\n".format(*self.get_limits())
s += f"\n{colorama.Style.DIM}low limit {colorama.Style.RESET_ALL}"
s += ValueInRange(*self.get_limits()).get_str(self.get_current_value())
s += f" {colorama.Style.DIM}high limit{colorama.Style.RESET_ALL}"
# # s += "\tuser limits (low,high) : {:1.6g},{1.6g}".format(self.get_limits())
return s
def __repr__(self):
print(str(self))
return object.__repr__(self)
def __call__(self, value):
self._currentChange = self.set_target_value(value)
def _tweak_ioc(self, step_value=None):
pv = self._motor.get_pv("TWV")
pvf = self._motor.get_pv("TWF")
pvr = self._motor.get_pv("TWR")
if not step_value:
step_value = pv.get()
print(f"Tweaking {self.name} at step size {step_value}", end="\r")
help = "q = exit; up = step*2; down = step/2, left = neg dir, right = pos dir\n"
help = help + "g = go abs, s = set"
print(f"tweaking {self.name}")
print(help)
print(f"Starting at {self.get_current_value()}")
step_value = float(step_value)
oldstep = 0
k = KeyPress()
cll = colorama.ansi.clear_line()
class Printer:
def print(self, **kwargs):
print(
cll + f"stepsize: {self.stepsize}; current: {kwargs['value']}",
end="\r",
)
p = Printer()
print(" ")
p.stepsize = step_value
p.print(value=self.get_current_value())
ind_callback = self.add_value_callback(p.print)
pv.put(step_value)
while k.isq() is False:
if oldstep != step_value:
p.stepsize = step_value
p.print(value=self.get_current_value())
oldstep = step_value
k.waitkey()
if k.isu():
step_value = step_value * 2.0
pv.put(step_value)
elif k.isd():
step_value = step_value / 2.0
pv.put(step_value)
elif k.isr():
pvf.put(1)
elif k.isl():
pvr.put(1)
elif k.iskey("g"):
print("enter absolute position (char to abort go to)")
sys.stdout.flush()
v = sys.stdin.readline()
try:
v = float(v.strip())
self.set_target_value(v)
except:
print("value cannot be converted to float, exit go to mode ...")
sys.stdout.flush()
elif k.iskey("s"):
print("enter new set value (char to abort setting)")
sys.stdout.flush()
v = sys.stdin.readline()
try:
v = float(v[0:-1])
self.reset_current_value_to(v)
except:
print("value cannot be converted to float, exit go to mode ...")
sys.stdout.flush()
elif k.isq():
break
else:
print(help)
self.clear_value_callback(index=ind_callback)
print(f"final position: {self.get_current_value()}")
print(f"final tweak step: {pv.get()}")
def tweak(self, *args, **kwargs):
return self._tweak_ioc(*args, **kwargs)
@spec_convenience
@update_changes
class MotorRecord:
+17 -2
View File
@@ -1,10 +1,14 @@
from ..aliases import Alias
from tabulate import tabulate
import colorama
class Assembly:
def __init__(self,name=None,is_alias=True):
self.name = name
if is_alias:
self.alias = Alias(name)
self.settings = []
self.status_indicators = []
def _append(self,foo_obj_init, *args, name=None, is_setting=False, is_status=True, is_alias=True, **kwargs):
@@ -19,11 +23,22 @@ class Assembly:
def get_status(self):
return {
'settings':{ts.alias.get_full_name(base=self):ts.get_current_value() for ts in self.settings},
'status_indictors':{ts.alias.get_full_name(base=self):ts.get_current_value() for ts in self.status_indicators},
'status_indicators':{ts.alias.get_full_name(base=self):ts.get_current_value() for ts in self.status_indicators},
}
def status(self):
stat = self.get_status()
s = tabulate(
[[colorama.Style.BRIGHT+name+colorama.Style.RESET_ALL, value] for name,value in stat['settings'].items()]
+[[name, value] for name,value in stat['status_indicators'].items()]
)
return s
def __repr__(self):
pass
stat = self.get_status()
s = tabulate([[name, value] for name,value in stat['status_indicators'].items()])
return s
+127
View File
@@ -9,6 +9,7 @@ from ..aliases import Alias, append_object_to_object
from ..endstations.hexapod import HexapodPI
from pathlib import Path
import subprocess
from ..elements.assembly import Assembly
def addMotorRecordToSelf(self, name=None, Id=None):
@@ -87,6 +88,132 @@ class GPS:
return self.get_adjustable_positions_str()
class XRD_new(Assembly):
def __init__(self, name=None, Id=None, configuration=["base"]):
"""X-ray diffractometer platform in AiwssFEL Bernina.\
<configuration> : list of elements mounted on
the plaform, options are kappa, nutable, hlgonio, polana"""
# self.Id = Id
super().__init__(name=name)
self.configuration = configuration
if "base" in self.configuration:
### motors base platform ###
### motors base platform ###
self._append(MotorRecord,Id + ":MOT_TX", name="xbase")
self._append(MotorRecord,Id + ":MOT_TY", name="ybase")
self._append(MotorRecord,Id + ":MOT_RX", name="rxbase")
self._append(MotorRecord,Id + ":MOT_MY_RYTH", name="alpha")
if "arm" in self.configuration:
### motors XRD detector arm ###
self._append(MotorRecord,Id + ":MOT_NY_RY2TH", name="gamma")
self._append(MotorRecord,Id + ":MOT_DT_RX2TH", name="delta")
### motors XRD area detector branch ###
self._append(MotorRecord,Id + ":MOT_D_T", name="tdet")
### motors XRD polarisation analyzer branch ###
self._append(MotorRecord,Id + ":MOT_P_T", name="tpol")
# missing: slits of flight tube
if "hlxz" in self.configuration:
### motors heavy load goniometer ###
self._append(MotorRecord,Id + ":MOT_TBL_TX", name="xhl")
self._append(MotorRecord,Id + ":MOT_TBL_TZ", name="zhl")
if "hly" in self.configuration:
self._append(MotorRecord,Id + ":MOT_TBL_TY", name="yhl")
if "hlrxrz" in self.configuration:
try:
self._append(MotorRecord,Id + ":MOT_TBL_RX", name="rxhl")
except:
print("XRD.rxhl not found")
pass
try:
self._append(MotorRecord,Id + ":MOT_TBL_RY", name="rzhl")
except:
print("XRD.rzhl not found")
pass
if "phi_table" in self.configuration:
### motors nu table ###
self._append(MotorRecord,Id + ":MOT_HEX_TX", name="tphi")
self._append(MotorRecord,Id + ":MOT_HEX_RX", name="phi")
if "phi_hex" in self.configuration:
### motors PI hexapod ###
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-X",
pvreadbackname="SARES20-HEX_PI:POSI-X",
name="xhex",
)
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-Y",
pvreadbackname="SARES20-HEX_PI:POSI-Y",
name="yhex",
)
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-Z",
pvreadbackname="SARES20-HEX_PI:POSI-Z",
name="zhex",
)
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-U",
pvreadbackname="SARES20-HEX_PI:POSI-U",
name="uhex",
)
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-V",
pvreadbackname="SARES20-HEX_PI:POSI-V",
name="vhex",
)
append_object_to_object(
self,
PvRecord,
"SARES20-HEX_PI:SET-POSI-W",
pvreadbackname="SARES20-HEX_PI:POSI-W",
name="whex",
)
if "kappa" in self.configuration:
self._append(MotorRecord,"SARES21-XRD:MOT_KAP_KRX", name="eta")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_KAP", name="kappa")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_KPH", name="phi")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_DTY", name="zkap")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_DTX", name="xkap")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_DTZ", name="ykap")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_DRX", name="rxkap")
self._append(MotorRecord, "SARES21-XRD:MOT_KAP_DRZ", name="rykap")
def get_adjustable_positions_str(self):
ostr = "*****XRD motor positions******\n"
for tkey, item in self.__dict__.items():
if hasattr(item, "get_current_value"):
pos = item.get_current_value()
ostr += " " + tkey.ljust(17) + " : % 14g\n" % pos
return ostr
def gui(self, guiType="xdm"):
""" Adjustable convention"""
cmd = ["caqtdm", "-macro"]
cmd = ['-noMsg', '-stylefile', 'sfop.qss','-macro', 'P=SARES21-XRD', '/sf/common/config/qt/ESB_XRD_exp.ui']
return subprocess.Popen(" ".join(cmd), shell=True)
# def __repr__(self):
# return self.get_adjustable_positions_str()
class XRD:
def __init__(self, name=None, Id=None, configuration=["base"]):
"""X-ray diffractometer platform in AiwssFEL Bernina.\