added det_diff
This commit is contained in:
@@ -35,7 +35,7 @@ class Epicstools:
|
||||
else:
|
||||
self.channel_list = channel_list
|
||||
for channel in self.channel_list:
|
||||
self.channels.append(PV(channel))
|
||||
self.channels.append(PV(channel, auto_monitor=True))
|
||||
|
||||
def h5(self, fina=None, channel_list=None, N_pulses=None, queue_size=100):
|
||||
channel_list = self.channel_list
|
||||
|
||||
@@ -52,7 +52,11 @@ class Alias:
|
||||
|
||||
def get_full_name(self, base=None, joiner="."):
|
||||
"""allembles full name with parent names down to base (is supplied). Joiner is the separator between the hirarchical names."""
|
||||
name = [self.alias]
|
||||
if (not (base is None)) and (self is base.alias):
|
||||
name = []
|
||||
return ""
|
||||
else:
|
||||
name = [self.alias]
|
||||
parent = self.parent
|
||||
while not parent == None:
|
||||
if (not (base is None) and (parent is base.alias)) or (parent is None):
|
||||
|
||||
+26
-5
@@ -199,9 +199,26 @@ components = [
|
||||
"type": "eco.xoptics.offsetMirrors:OffsetMirror",
|
||||
"kwargs": {"Id": "SAROP21-OOMV096"},
|
||||
},
|
||||
{
|
||||
"name": "offset",
|
||||
"args": [],
|
||||
"kwargs": {},
|
||||
"z_und": 96,
|
||||
"desc": "offset mirrors in pink mode",
|
||||
"type": "eco.xoptics.offsetMirrors_new:OffsetMirrorsBernina",
|
||||
"kwargs": {},
|
||||
},
|
||||
{
|
||||
"name": "mono",
|
||||
"args": ["SAROP21-ODCM098"],
|
||||
"kwargs": {},
|
||||
"z_und": 98,
|
||||
"desc": "DCM Monochromator",
|
||||
"type": "eco.xoptics.dcm_new:DoubleCrystalMono",
|
||||
},
|
||||
{
|
||||
"name": "mono_old",
|
||||
"args": ["SAROP21-ODCM098"],
|
||||
"kwargs": {
|
||||
"energy_sp": "SAROP21-ARAMIS:ENERGY_SP",
|
||||
"energy_rb": "SAROP21-ARAMIS:ENERGY",
|
||||
@@ -375,19 +392,23 @@ components = [
|
||||
},
|
||||
{
|
||||
"args": [],
|
||||
"name": "xrd",
|
||||
"name": "xrd_old",
|
||||
"z_und": 142,
|
||||
"desc": "Xray diffractometer",
|
||||
"type": "eco.endstations.bernina_diffractometers:XRD",
|
||||
"type": "eco.endstations.bernina_diffractometers:XRD_old",
|
||||
"kwargs": {"Id": "SARES21-XRD", "configuration": config["xrd_config"]},
|
||||
},
|
||||
{
|
||||
"args": [],
|
||||
"name": "xrd_new",
|
||||
"name": "xrd",
|
||||
"z_und": 142,
|
||||
"desc": "Xray diffractometer",
|
||||
"type": "eco.endstations.bernina_diffractometers:XRD_new",
|
||||
"kwargs": {"Id": "SARES21-XRD", "configuration": config["xrd_config"]},
|
||||
"type": "eco.endstations.bernina_diffractometers:XRD",
|
||||
"kwargs": {
|
||||
"Id": "SARES21-XRD",
|
||||
"configuration": config["xrd_config"],
|
||||
"diff_detector": {"jf_id": "JF01T03V01"},
|
||||
},
|
||||
"lazy": False,
|
||||
},
|
||||
{
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
from ..devices_general.adjustable import PvRecord, AdjustableVirtual
|
||||
from ..elements import Assembly
|
||||
from ..aliases import Alias
|
||||
from ..elements import memory
|
||||
|
||||
|
||||
class Jungfrau(Assembly):
|
||||
def __init__(
|
||||
self,
|
||||
jf_id,
|
||||
pv_trigger="SAR-CVME-TIFALL5-EVG0:SoftEvt-EvtCode-SP",
|
||||
trigger_on=254,
|
||||
trigger_off=255,
|
||||
name=None,
|
||||
):
|
||||
self.name = name
|
||||
self.alias = Alias(name, channel=jf_id, channeltype="JF")
|
||||
self.settings = []
|
||||
self.status_indicators = []
|
||||
self.view_toplevel_only = []
|
||||
if memory.global_memory_dir:
|
||||
self.memory = memory.Memory(self)
|
||||
|
||||
self.jf_id = jf_id
|
||||
self._append(PvRecord, pv_trigger, is_status=True, name="trigger")
|
||||
self._trigger_on = trigger_on
|
||||
self._trigger_off = trigger_off
|
||||
self._append(
|
||||
AdjustableVirtual,
|
||||
[self.trigger],
|
||||
lambda value: value == self._trigger_on,
|
||||
self._set_trigger_enable,
|
||||
name="trigger_enable",
|
||||
append_aliases=False,
|
||||
is_setting=True,
|
||||
)
|
||||
|
||||
def _set_trigger_enable(self, value):
|
||||
if value:
|
||||
self.trigger.set_target_value(self._trigger_on).wait()
|
||||
else:
|
||||
self.trigger.set_target_value(self._triggeroff).wait()
|
||||
+44
-16
@@ -1,14 +1,16 @@
|
||||
from ..aliases import Alias
|
||||
from tabulate import tabulate
|
||||
import colorama
|
||||
from . import memory
|
||||
from . import memory
|
||||
|
||||
|
||||
class Assembly:
|
||||
def __init__(self, name=None, parent=None, is_alias=True):
|
||||
self.name = name
|
||||
self.alias = Alias(name,parent=parent)
|
||||
self.alias = Alias(name, parent=parent)
|
||||
self.settings = []
|
||||
self.status_indicators = []
|
||||
self.view_toplevel_only = []
|
||||
if memory.global_memory_dir:
|
||||
self.memory = memory.Memory(self)
|
||||
|
||||
@@ -20,6 +22,7 @@ class Assembly:
|
||||
is_setting=False,
|
||||
is_status=True,
|
||||
is_alias=True,
|
||||
view_toplevel_only=True,
|
||||
**kwargs
|
||||
):
|
||||
self.__dict__[name] = foo_obj_init(*args, **kwargs, name=name)
|
||||
@@ -31,28 +34,31 @@ class Assembly:
|
||||
self.settings.append(self.__dict__[name])
|
||||
if (not is_setting) and is_status:
|
||||
self.status_indicators.append(self.__dict__[name])
|
||||
if view_toplevel_only:
|
||||
self.view_toplevel_only.append(self.__dict__[name])
|
||||
|
||||
def get_status(self,base=None):
|
||||
def get_status(self, base=None):
|
||||
if base is None:
|
||||
base = self
|
||||
settings = {}
|
||||
status_indicators = {}
|
||||
for ts in self.settings:
|
||||
if (not (ts is self)) and hasattr(ts,'get_status'):
|
||||
tstat = ts.get_status(base=self)
|
||||
settings.update(tstat['settings'])
|
||||
status_indicators.update(tstat['status_indicators'])
|
||||
if (not (ts is self)) and hasattr(ts, "get_status"):
|
||||
tstat = ts.get_status(base=base)
|
||||
settings.update(tstat["settings"])
|
||||
status_indicators.update(tstat["status_indicators"])
|
||||
else:
|
||||
settings[ts.alias.get_full_name(base=base)] = ts.get_current_value()
|
||||
for ts in self.status_indicators:
|
||||
if (not (ts is self)) and hasattr(ts,'get_status'):
|
||||
if (not (ts is self)) and hasattr(ts, "get_status"):
|
||||
tstat = ts.get_status()
|
||||
settings.update(tstat['settings'])
|
||||
status_indicators.update(tstat['status_indicators'])
|
||||
settings.update(tstat["settings"])
|
||||
status_indicators.update(tstat["status_indicators"])
|
||||
else:
|
||||
status_indicators[ts.alias.get_full_name(base=base)] = ts.get_current_value()
|
||||
return {'settings':settings,'status_indicators':status_indicators}
|
||||
|
||||
status_indicators[
|
||||
ts.alias.get_full_name(base=base)
|
||||
] = ts.get_current_value()
|
||||
return {"settings": settings, "status_indicators": status_indicators}
|
||||
|
||||
def status(self, get_string=False):
|
||||
stat = self.get_status()
|
||||
@@ -68,7 +74,29 @@ class Assembly:
|
||||
else:
|
||||
print(s)
|
||||
|
||||
def __repr__(self):
|
||||
stat = self.get_status()
|
||||
s = tabulate([[name, value] for name, value in stat["settings"].items()])
|
||||
def get_status_str(self, base=None, stat_fields=["settings"]):
|
||||
stat = self.get_status(base=base)
|
||||
stat_filt = {}
|
||||
for stat_field in stat_fields:
|
||||
tstat = stat[stat_field]
|
||||
for to in self.view_toplevel_only:
|
||||
tname = to.alias.get_full_name(base=base)
|
||||
tstat = filter_names(tname, tstat)
|
||||
stat_filt[stat_field] = tstat
|
||||
s = tabulate([[name, value] for name, value in stat_filt[stat_field].items()])
|
||||
return s
|
||||
|
||||
def __repr__(self):
|
||||
return self.get_status_str(base=self)
|
||||
|
||||
|
||||
def filter_names(name, stat_dict):
|
||||
out = {}
|
||||
for key, value in stat_dict.items():
|
||||
keys = key.split(".")
|
||||
if keys[0] == name:
|
||||
if len(keys) == 1:
|
||||
out[key] = value
|
||||
else:
|
||||
out[key] = value
|
||||
return out
|
||||
|
||||
+197
-32
@@ -1,87 +1,252 @@
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from datetime import datetime
|
||||
from ..devices_general.adjustable import AdjustableFS
|
||||
from ..utilities.KeyPress import KeyPress
|
||||
from tabulate import tabulate
|
||||
import sys, colorama
|
||||
|
||||
global_memory_dir = None
|
||||
|
||||
def set_global_memory_dir(dirpath,mode='w'):
|
||||
globals()['global_memory_dir'] = Path(dirpath).expanduser()
|
||||
|
||||
def set_global_memory_dir(dirpath, mode="w"):
|
||||
globals()["global_memory_dir"] = Path(dirpath).expanduser()
|
||||
|
||||
|
||||
def get_memory(name):
|
||||
if not (global_memory_dir is None):
|
||||
return Memory(name)
|
||||
|
||||
|
||||
class Memory:
|
||||
def __init__(self,obj,memory_dir=global_memory_dir,categories={'recall':['settings'],'track':['status_indicators']}):
|
||||
def __init__(
|
||||
self,
|
||||
obj,
|
||||
memory_dir=global_memory_dir,
|
||||
categories={"recall": ["settings"], "track": ["status_indicators"]},
|
||||
):
|
||||
self.obj_parent = obj
|
||||
self.categories = categories
|
||||
if not memory_dir:
|
||||
memory_dir = global_memory_dir
|
||||
self.base_dir = Path(memory_dir)
|
||||
|
||||
def setup_path(self):
|
||||
def setup_path(self):
|
||||
name = self.obj_parent.alias.get_full_name(joiner=None)
|
||||
self.dir = Path(self.base_dir) / Path('/'.join(reversed(name)))
|
||||
self.memories = AdjustableFS(self.dir/Path('memories.json'),default_value={})
|
||||
self.dir = Path(self.base_dir) / Path("/".join(reversed(name)))
|
||||
self.memories = AdjustableFS(self.dir / Path("memories.json"), default_value={})
|
||||
try:
|
||||
self.dir.mkdir(exist_ok=True)
|
||||
except:
|
||||
print('Could not create memory directory')
|
||||
|
||||
print("Could not create memory directory")
|
||||
|
||||
def __str__(self):
|
||||
self.setup_path()
|
||||
mem = self.memories()
|
||||
a = []
|
||||
for n,(key,content) in enumerate(mem.items()):
|
||||
for n, (key, content) in enumerate(mem.items()):
|
||||
row = [n]
|
||||
t = datetime.fromisoformat(key)
|
||||
row.append(t.strftime('%Y-%m-%d: %a %-H:%M'))
|
||||
row.append(content['message'])
|
||||
row.append(t.strftime("%Y-%m-%d: %a %-H:%M"))
|
||||
row.append(content["message"])
|
||||
a.append(row)
|
||||
return (tabulate(a,headers=["Index","Time","Message"]))
|
||||
return tabulate(a, headers=["Index", "Time", "Message"])
|
||||
|
||||
def __call__(self, index):
|
||||
# print(self.get_memory_difference_str(index))
|
||||
self.recall(index)
|
||||
|
||||
|
||||
def memorize(self, message=None, attributes={}, force_message=True):
|
||||
self.setup_path()
|
||||
stat_now = self.obj_parent.get_status()
|
||||
stat_now['memorized_attributes'] = attributes
|
||||
stat_now = self.obj_parent.get_status(base=self.obj_parent)
|
||||
stat_now["memorized_attributes"] = attributes
|
||||
key = datetime.now().isoformat()
|
||||
mem = self.memories()
|
||||
if force_message:
|
||||
while not message:
|
||||
message = input("Please enter a message associated to this memory entry:\n>>> ")
|
||||
mem[key] = {'message':message,'categories':self.categories}
|
||||
tmp = AdjustableFS(self.dir / Path(key + '.json'))
|
||||
message = input(
|
||||
"Please enter a message associated to this memory entry:\n>>> "
|
||||
)
|
||||
mem[key] = {"message": message, "categories": self.categories}
|
||||
tmp = AdjustableFS(self.dir / Path(key + ".json"))
|
||||
tmp(stat_now)
|
||||
self.memories(mem)
|
||||
|
||||
def get_memory(self,index=None,key=None):
|
||||
def get_memory(self, index=None, key=None):
|
||||
self.setup_path()
|
||||
if not (index is None):
|
||||
key = list(self.memories().keys())[index]
|
||||
tmp = AdjustableFS(self.dir / Path(key + '.json'))
|
||||
tmp = AdjustableFS(self.dir / Path(key + ".json"))
|
||||
return tmp()
|
||||
|
||||
def recall(self,memory_index=None,key=None):
|
||||
# mem = self.get_memory(index=memory_index,key=key)
|
||||
# rec = mem['settings']
|
||||
# for n,(key,value) in enumerate(rec.items()):
|
||||
# row = [n]
|
||||
# present_value = key.split('.')
|
||||
|
||||
def recall(self, memory_index=None, key=None, wait=True, show_changes_only=True):
|
||||
select = self.select_from_memory(
|
||||
memory_index, show_changes_only=show_changes_only
|
||||
)
|
||||
if not select:
|
||||
return
|
||||
mem = self.get_memory(index=memory_index)
|
||||
rec = mem["settings"]
|
||||
if not input("would you really like to do the change? (y/n):") == "y":
|
||||
return
|
||||
changes = []
|
||||
for sel, (key, val) in zip(select, rec.items()):
|
||||
if sel:
|
||||
to = name2obj(self.obj_parent, key)
|
||||
print(f"Changing {key} from {to.get_current_value()} to {val}")
|
||||
changes.append(to.set_target_value(val))
|
||||
if wait:
|
||||
for change in changes:
|
||||
change.wait()
|
||||
return
|
||||
else:
|
||||
return changes
|
||||
|
||||
def get_memory_difference_str(
|
||||
self, memory_index, select=None, ask_select=True, show_changes_only=False
|
||||
):
|
||||
mem = self.get_memory(index=memory_index)
|
||||
rec = mem["settings"]
|
||||
if not select:
|
||||
select = [True] * len(rec)
|
||||
table = []
|
||||
for n, (tsel, (key, recall_value)) in enumerate(zip(select, rec.items())):
|
||||
present_value = name2obj(self.obj_parent, key).get_current_value()
|
||||
if tsel:
|
||||
tselstr = "x"
|
||||
else:
|
||||
tselstr = " "
|
||||
if present_value == recall_value:
|
||||
changed = False
|
||||
comp_indicator = (
|
||||
colorama.Fore.GREEN
|
||||
+ colorama.Style.BRIGHT
|
||||
+ "=="
|
||||
+ colorama.Style.RESET_ALL
|
||||
)
|
||||
else:
|
||||
changed = True
|
||||
if not tsel:
|
||||
comp_indicator = f"not changed ({recall_value-present_value:+g})"
|
||||
else:
|
||||
comp_indicator = (
|
||||
colorama.Fore.RED
|
||||
+ colorama.Style.BRIGHT
|
||||
+ f"--({recall_value-present_value:+g})-->"
|
||||
+ colorama.Style.RESET_ALL
|
||||
)
|
||||
if show_changes_only and (not changed):
|
||||
continue
|
||||
table.append([n, tselstr, key, present_value, comp_indicator, recall_value])
|
||||
|
||||
return tabulate(
|
||||
table,
|
||||
headers=[
|
||||
"",
|
||||
"",
|
||||
"name",
|
||||
"present",
|
||||
"",
|
||||
"memory",
|
||||
],
|
||||
colalign=("decimal", "center", "left", "decimal", "center", "decimal"),
|
||||
)
|
||||
|
||||
def select_from_memory(self, memory_index, show_changes_only=True):
|
||||
mem = self.get_memory(index=memory_index)
|
||||
rec = mem["settings"]
|
||||
k = KeyPress()
|
||||
# cll = colorama.ansi.clear_line()
|
||||
|
||||
help = "Change selection pressing keys followed by numbered seelection \n"
|
||||
help += " o : Select only (enter comma-separated row numbers)\n"
|
||||
help += " a : Select additionally (enter comma-separated row numbers)\n"
|
||||
help += " e : Exclude from selection (enter comma-separated row numbers)\n"
|
||||
help += " r : recall selected memory\n"
|
||||
help += " q : quit\n"
|
||||
|
||||
class Printer:
|
||||
def __init__(self, o=self):
|
||||
self.o = o
|
||||
self.len = len(rec)
|
||||
self.select = [True] * self.len
|
||||
|
||||
def print(self, **kwargs):
|
||||
print(
|
||||
self.o.get_memory_difference_str(
|
||||
memory_index,
|
||||
select=self.select,
|
||||
show_changes_only=show_changes_only,
|
||||
)
|
||||
)
|
||||
print(help)
|
||||
|
||||
def select_only(self):
|
||||
v = self.get_array()
|
||||
self.select = [False] * self.len
|
||||
for tv in v:
|
||||
self.select[tv] = True
|
||||
|
||||
def select_additional(self):
|
||||
v = self.get_array()
|
||||
for tv in v:
|
||||
self.select[tv] = True
|
||||
|
||||
def exclude(self):
|
||||
v = self.get_array()
|
||||
for tv in v:
|
||||
self.select[tv] = False
|
||||
|
||||
def get_array(self):
|
||||
sys.stdout.flush()
|
||||
v = sys.stdin.readline()
|
||||
try:
|
||||
v = v.split(",")
|
||||
v = [int(tv) for tv in v]
|
||||
print(v)
|
||||
return v
|
||||
except:
|
||||
print(
|
||||
"value cannot be converted to listed integers, please try again!"
|
||||
)
|
||||
sys.stdout.flush()
|
||||
return self.get_array()
|
||||
|
||||
p = Printer()
|
||||
while k.isq() is False:
|
||||
p.print()
|
||||
k.waitkey()
|
||||
if k.iskey("o"):
|
||||
print("Select only: ")
|
||||
p.select_only()
|
||||
elif k.iskey("a"):
|
||||
print("Append to selection: ")
|
||||
p.select_additional()
|
||||
elif k.iskey("e"):
|
||||
print("Exclude from selection: ")
|
||||
p.exclude()
|
||||
elif k.isq():
|
||||
return
|
||||
elif k.iskey("r"):
|
||||
return p.select
|
||||
else:
|
||||
# print(help)
|
||||
pass
|
||||
|
||||
# stat_now = self.obj_parent.get_status()
|
||||
# for mem
|
||||
pass
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
|
||||
def name2obj(obj_parent,name, delimiter='.'):
|
||||
def name2obj(obj_parent, name, delimiter="."):
|
||||
if type(name) is str:
|
||||
name = name.split(delimiter)
|
||||
obj = obj_parent
|
||||
for tn in name:
|
||||
obj = obj.__dict__[tn]
|
||||
return name
|
||||
if not tn:
|
||||
obj = obj
|
||||
else:
|
||||
obj = obj.__dict__[tn]
|
||||
|
||||
return obj
|
||||
@@ -10,6 +10,7 @@ from ..endstations.hexapod import HexapodPI
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
from ..elements.assembly import Assembly
|
||||
from ..detector.jungfrau import Jungfrau
|
||||
|
||||
|
||||
def addMotorRecordToSelf(self, name=None, Id=None):
|
||||
@@ -88,8 +89,8 @@ class GPS:
|
||||
return self.get_adjustable_positions_str()
|
||||
|
||||
|
||||
class XRD_new(Assembly):
|
||||
def __init__(self, name=None, Id=None, configuration=["base"]):
|
||||
class XRD(Assembly):
|
||||
def __init__(self, name=None, Id=None, configuration=["base"], diff_detector=None):
|
||||
"""X-ray diffractometer platform in AiwssFEL Bernina.\
|
||||
<configuration> : list of elements mounted on
|
||||
the plaform, options are kappa, nutable, hlgonio, polana"""
|
||||
@@ -241,6 +242,8 @@ class XRD_new(Assembly):
|
||||
name="rykap",
|
||||
is_setting=True,
|
||||
)
|
||||
if diff_detector:
|
||||
self._append(Jungfrau, diff_detector["jf_id"], name="det_diff")
|
||||
|
||||
def get_adjustable_positions_str(self):
|
||||
ostr = "*****XRD motor positions******\n"
|
||||
@@ -268,7 +271,7 @@ class XRD_new(Assembly):
|
||||
# return self.get_adjustable_positions_str()
|
||||
|
||||
|
||||
class XRD:
|
||||
class XRD_old:
|
||||
def __init__(self, name=None, Id=None, configuration=["base"]):
|
||||
"""X-ray diffractometer platform in AiwssFEL Bernina.\
|
||||
<configuration> : list of elements mounted on
|
||||
|
||||
+77
-1
@@ -1,4 +1,4 @@
|
||||
from ..devices_general.motors import MotorRecord
|
||||
from ..devices_general.motors import MotorRecord, MotorRecord_new
|
||||
from ..devices_general.pv_adjustable import PvRecord
|
||||
from epics import PV
|
||||
from ..devices_general.utilities import Changer
|
||||
@@ -47,6 +47,82 @@ def addPvRecordToSelf(
|
||||
print(f"Warning! Could not find PV {name} (Id:{pvsetname} RB:{pvreadbackname})")
|
||||
|
||||
|
||||
class DoubleCrystalMono(Assembly):
|
||||
def __init__(self, pvname, name=None, energy_sp=None, energy_rb=None):
|
||||
super().__init__(name=name)
|
||||
self.pvname = pvname
|
||||
self._append(MotorRecord_new, pvname + ":RX12", name="theta")
|
||||
self._append(MotorRecord_new, pvname + ":TX12", name="x")
|
||||
self._append(MotorRecord_new, pvname + ":T2", name="gap")
|
||||
self._append(MotorRecord_new, pvname + ":RZ1", name="roll1")
|
||||
self._append(MotorRecord_new, pvname + ":RZ2", name="roll2")
|
||||
self._append(MotorRecord_new, pvname + ":RX2", name="pitch2")
|
||||
self._append(PvRecord, pvsetname=energy_sp,pvreadbackname=energy_rb, accuracy=0.5,name="energy")
|
||||
self.moving = PV(Id + ":MOVING")
|
||||
self._stop = PV(Id + ":STOP.PROC")
|
||||
|
||||
def move_and_wait(self, value, checktime=0.01, precision=0.5):
|
||||
self.energy.set_target_value(value)
|
||||
while abs(self.wait_for_valid_value() - value) > precision:
|
||||
sleep(checktime)
|
||||
|
||||
def set_target_value(self, value, hold=False):
|
||||
changer = lambda value: self.move_and_wait(value)
|
||||
return Changer(
|
||||
target=value, parent=self, changer=changer, hold=hold, stopper=self.stop
|
||||
)
|
||||
|
||||
def stop(self):
|
||||
self._stop.put(1)
|
||||
|
||||
def get_current_value(self):
|
||||
currentenergy = self.energy.get_current_value()
|
||||
return currentenergy
|
||||
|
||||
def wait_for_valid_value(self):
|
||||
tval = np.nan
|
||||
while not np.isfinite(tval):
|
||||
tval = self.energy.get_current_value()
|
||||
return tval
|
||||
|
||||
def set_current_value(self, value):
|
||||
self.energy.set_current_value(value)
|
||||
|
||||
def get_moveDone(self):
|
||||
inmotion = int(self.moving.get())
|
||||
return inmotion
|
||||
|
||||
# spec-inspired convenience methods
|
||||
def mv(self, value):
|
||||
self._currentChange = self.set_target_value(value)
|
||||
|
||||
def wm(self, *args, **kwargs):
|
||||
return self.get_current_value(*args, **kwargs)
|
||||
|
||||
def mvr(self, value, *args, **kwargs):
|
||||
|
||||
if self.get_moveDone == 1:
|
||||
startvalue = self.get_current_value(*args, **kwargs)
|
||||
else:
|
||||
startvalue = self.get_current_value(*args, **kwargs)
|
||||
self._currentChange = self.set_target_value(value + startvalue, *args, **kwargs)
|
||||
|
||||
def wait(self):
|
||||
self._currentChange.wait()
|
||||
|
||||
def __str__(self):
|
||||
s = "**Double crystal monochromator**\n\n"
|
||||
motors = "theta gap x roll1 roll2 pitch2 energy".split()
|
||||
for motor in motors:
|
||||
s += " - %s = %.4f\n" % (motor, getattr(self, motor).get_current_value())
|
||||
return s
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def __call__(self, value):
|
||||
self._currentChange = self.set_target_value(value)
|
||||
|
||||
class Double_Crystal_Mono:
|
||||
def __init__(self, Id, name=None, energy_sp=None, energy_rb=None):
|
||||
self.Id = Id
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
from ..devices_general.motors import MotorRecord, MotorRecord_new
|
||||
from ..devices_general.adjustable import PvRecord
|
||||
from epics import PV
|
||||
from ..devices_general.utilities import Changer
|
||||
from time import sleep
|
||||
import numpy as np
|
||||
from ..aliases import Alias, append_object_to_object
|
||||
from ..devices_general.adjustable import (
|
||||
PvEnum,
|
||||
spec_convenience,
|
||||
default_representation,
|
||||
)
|
||||
from ..devices_general.utilities import Changer
|
||||
from ..elements.assembly import Assembly
|
||||
|
||||
|
||||
@spec_convenience
|
||||
class DoubleCrystalMono(Assembly):
|
||||
def __init__(
|
||||
self,
|
||||
pvname,
|
||||
name=None,
|
||||
energy_sp="SAROP21-ARAMIS:ENERGY_SP",
|
||||
energy_rb="SAROP21-ARAMIS:ENERGY",
|
||||
):
|
||||
super().__init__(name=name)
|
||||
self.pvname = pvname
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":RX12",
|
||||
name="theta",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":TX12",
|
||||
name="x",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":T2",
|
||||
name="gap",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":RZ1",
|
||||
name="roll1",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":RZ2",
|
||||
name="roll2",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
MotorRecord_new,
|
||||
pvname + ":RX2",
|
||||
name="pitch2",
|
||||
is_setting=True,
|
||||
view_toplevel_only=True,
|
||||
)
|
||||
self._append(
|
||||
PvRecord, energy_sp, pvreadbackname=energy_rb, accuracy=0.5, name="energy"
|
||||
)
|
||||
self.settings.append(self)
|
||||
|
||||
def set_target_value(self, *args, **kwargs):
|
||||
return self.energy.set_target_value(*args, **kwargs)
|
||||
|
||||
def get_current_value(self, *args, **kwargs):
|
||||
return self.energy.get_current_value(*args, **kwargs)
|
||||
|
||||
|
||||
@spec_convenience
|
||||
@default_representation
|
||||
class EcolEnergy(Assembly):
|
||||
def __init__(
|
||||
self,
|
||||
pv_val="SARCL02-MBND100:USER-ENE",
|
||||
pv_enable="SARCL02-MBND100:USER-ENA",
|
||||
pv_rb="SARCL02-MBND100:P-READ",
|
||||
pv_diff="SARCL02-MBND100:USER-ERROR",
|
||||
name=None,
|
||||
):
|
||||
super().__init__(name=name)
|
||||
self._append(PvEnum, pv_enable, name="enable_control")
|
||||
self._pv_val = PV(pv_val)
|
||||
self._pv_rb = PV(pv_rb)
|
||||
self._pv_diff = PV(pv_diff)
|
||||
|
||||
def change_energy_to(self, value, tolerance=0.5):
|
||||
self.enable_control(0)
|
||||
sleep(0.1)
|
||||
self._pv_val.put(value)
|
||||
sleep(0.1)
|
||||
self.enable_control(1)
|
||||
done = False
|
||||
sleep(0.1)
|
||||
while not done:
|
||||
sleep(0.05)
|
||||
diffabs = np.abs(self._pv_rb.get() - value)
|
||||
# diff = self._pv_diff.get()
|
||||
if diffabs < tolerance:
|
||||
diff = self._pv_diff.get()
|
||||
if diff == 0:
|
||||
done = True
|
||||
self.enable_control(0)
|
||||
|
||||
def get_current_value(self):
|
||||
return self._pv_rb.get()
|
||||
|
||||
def set_target_value(self, value, hold=False):
|
||||
""" Adjustable convention"""
|
||||
|
||||
changer = lambda value: self.change_energy_to(value)
|
||||
return Changer(
|
||||
target=value, parent=self, changer=changer, hold=hold, stopper=None
|
||||
)
|
||||
@@ -0,0 +1,48 @@
|
||||
from ..devices_general.motors import MotorRecord_new
|
||||
from ..devices_general.adjustable import PvRecord, PvEnum
|
||||
from ..elements.assembly import Assembly
|
||||
|
||||
|
||||
class OffsetMirror(Assembly):
|
||||
def __init__(self, pvname, name=None):
|
||||
super().__init__(name=name)
|
||||
self.pvname = pvname
|
||||
self._append(MotorRecord_new, self.pvname + ":W_X", name="x", is_setting=True)
|
||||
self._append(MotorRecord_new, self.pvname + ":W_Y", name="y", is_setting=True)
|
||||
self._append(MotorRecord_new, self.pvname + ":W_RX", name="rx", is_setting=True)
|
||||
self._append(MotorRecord_new, self.pvname + ":W_RZ", name="rz", is_setting=True)
|
||||
self._append(
|
||||
PvRecord,
|
||||
self.pvname + ":CURV_SP",
|
||||
pvreadbackname=self.pvname + ":CURV",
|
||||
accuracy=None,
|
||||
name="curvature",
|
||||
is_setting=True,
|
||||
)
|
||||
self._append(
|
||||
PvRecord,
|
||||
self.pvname + ":ASYMMETRY_SP",
|
||||
pvreadbackname=self.pvname + ":ASYMMETRY",
|
||||
accuracy=None,
|
||||
name="asymmetry",
|
||||
is_setting=True,
|
||||
)
|
||||
|
||||
|
||||
class OffsetMirrorsBernina(Assembly):
|
||||
def __init__(self, name=None):
|
||||
super().__init__(name=name)
|
||||
self._append(
|
||||
OffsetMirror,
|
||||
"SAROP21-OOMV092",
|
||||
name="mirr1",
|
||||
is_setting=True,
|
||||
view_toplevel_only=False,
|
||||
)
|
||||
self._append(
|
||||
OffsetMirror,
|
||||
"SAROP21-OOMV096",
|
||||
name="mirr2",
|
||||
is_setting=True,
|
||||
view_toplevel_only=False,
|
||||
)
|
||||
Reference in New Issue
Block a user