improved dap a bit, added flighttube slits for xrd.

This commit is contained in:
2025-03-24 22:01:18 +01:00
parent 0d11fea537
commit fed6eb3ed1
6 changed files with 427 additions and 19 deletions
+34 -9
View File
@@ -119,6 +119,13 @@ class Jungfrau(Assembly):
is_display=True,
)
self._last_dap_req_time = 0
self._append(
AdjustableFS,
'/sf/bernina/config/eco/reference_values/dap_settings',
name="_dap_settings_storage",
is_display=False,
is_setting=False,
)
self._append(
AdjustableGetSet,
self.get_dap_settings,
@@ -133,6 +140,7 @@ class Jungfrau(Assembly):
is_setting_children=True,
name="settings_dap",
)
if config_adj:
self._append(
@@ -152,6 +160,14 @@ class Jungfrau(Assembly):
is_display="recursive",
)
def set_dap_rois(self,*rois):
tmp = self.settings_dap._base_dict()
tmp['roi_x1']=[roi[0] for roi in rois if roi]
tmp['roi_x2']=[roi[1] for roi in rois if roi]
tmp['roi_y1']=[roi[2] for roi in rois if roi]
tmp['roi_y2']=[roi[3] for roi in rois if roi]
self.settings_dap._base_dict(tmp)
def _set_trigger_enable(self, value):
if value:
self.trigger.set_target_value(self._trigger_on).wait()
@@ -216,16 +232,24 @@ class Jungfrau(Assembly):
else:
return f"aux/{dest.name}"
def get_dap_settings(self):
if 5 < (time.time() - self._last_dap_req_time):
self._last_dap_message = requests.get(
f"{self.broker_address_aux}/get_dap_settings",
json={"detector_name": self.jf_id},
).json()
self._last_dap_req_time = time.time()
def get_dap_settings(self, force=False):
if self._last_dap_message["status"] == "ok":
return self._last_dap_message["parameters"]
if force:
if 5 < (time.time() - self._last_dap_req_time):
self._last_dap_message = requests.get(
f"{self.broker_address_aux}/get_dap_settings",
json={"detector_name": self.jf_id},
).json()
self._last_dap_req_time = time.time()
if self._last_dap_message["status"] == "ok":
self._dap_settings_storage.set_target_value(self._last_dap_message["parameters"]).wait()
return self._last_dap_message["parameters"]
else:
val = self._dap_settings_storage.get_current_value()
if not val:
val = self.get_dap_settings(force=True)
return val
def set_dap_settings(self, dap_setting_dict):
# print("Setting not implmented yet!")
@@ -235,6 +259,7 @@ class Jungfrau(Assembly):
json={"detector_name": self.jf_id, "parameters": dap_setting_dict},
).json()
if m["status"] == "ok":
self._dap_settings_storage.set_target_value(dap_setting_dict).wait()
return m
def get_detector_frequency(self):
+61
View File
@@ -900,6 +900,7 @@ class MotorRecord(Assembly):
# alias_fields={"readback": "RBV"},
alias_fields={},
backlash_definition=False,
resolution_pars = False,
is_psi_mforce=False,
schneider_config=None,
expect_bad_limits=True,
@@ -992,6 +993,66 @@ class MotorRecord(Assembly):
is_setting=True,
)
if resolution_pars:
self._append(
AdjustablePv,
self.pvname + ".MRES",
name="motor_resolution",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePv,
self.pvname + ".ERES",
name="encoder_resolution",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePv,
self.pvname + ".RRES",
name="readback_resolution",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePvEnum,
self.pvname + ".UEIP",
name="use_encoder",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePvEnum,
self.pvname + ".URIP",
name="use_readback",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePv,
self.pvname + ".RDBD",
name="retry_deadband",
is_setting=True,
is_display=False,
)
self._append(
AdjustablePv,
self.pvname + ".RTRY",
name="retry_max",
is_setting=True,
is_display=False,
)
self._append(
DetectorPvData,
self.pvname + ".RCNT",
name="retry_count",
is_setting=False,
is_display=False,
)
if has_park_pv:
self._append(
AdjustablePv,
@@ -6,6 +6,8 @@ from eco.endstations.bernina_sample_environments import (
)
from eco.epics import get_from_archive
from eco.xoptics.slits import SlitBladesGeneral
sys.path.append("..")
from ..devices_general.motors import MotorRecord, MotorRecord
from ..elements.adjustable import AdjustableMemory, AdjustableVirtual
@@ -122,6 +124,32 @@ def append_diffractometer_modules(obj, configuration):
)
obj.set_base_off = DeltaTauCurrOff("SARES22-GPS:asyn2.AOUT")
if hasattr(configuration, "detector_flighttube"):
if configuration.detector_flighttube():
### slit close to sample
# up down according to You-B geometry
obj._append(
SlitBladesGeneral,
def_blade_up={"args": [MotorRecord,obj.pvname + ":MOT_SLT_T_X2"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_down={"args": [MotorRecord,obj.pvname + ":MOT_SLT_T_X1"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_left={"args": [MotorRecord,obj.pvname + ":MOT_SLT_T_Y2"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_right={"args": [MotorRecord,obj.pvname + ":MOT_SLT_T_Y1"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
name='slit_sam'
)
### slit close to detector
# up down according to You-B geometry
obj._append(
SlitBladesGeneral,
def_blade_up={"args": [MotorRecord,obj.pvname + ":MOT_SLT_C_X2"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_down={"args": [MotorRecord,obj.pvname + ":MOT_SLT_C_X1"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_left={"args": [MotorRecord,obj.pvname + ":MOT_SLT_C_Y2"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
def_blade_right={"args": [MotorRecord,obj.pvname + ":MOT_SLT_C_Y1"], "kwargs": {'resolution_pars':True, 'backlash_definition':True}},
name='slit_det'
)
# missing: slits of flight tube
obj.set_det_slits_off = DeltaTauCurrOff("SARES21-XRD:asyn2.AOUT")
if configuration.arm():
obj._append(
MotorRecord,
+116 -10
View File
@@ -1,4 +1,6 @@
from eco.loptics.position_monitors import CameraPositionMonitor
from eco.motion.coordinate_transformation import CartCooRotated
from ..elements.assembly import Assembly
from functools import partial
from ..devices_general.motors import (
@@ -70,6 +72,83 @@ class IncouplingCleanBernina(Assembly):
is_display=True,
)
class MIRVirtualStages(Assembly):
def __init__(self, name=None, nx=None, nz=None,mx=None, mz=None):
super().__init__(name=name)
self._nx = nx
self._nz = nz
self._mx = mx
self._mz = mz
self._append(
AdjustableFS,
"/photonics/home/gac-bernina/eco/configuration/p21954_lens_z0",
name="offset_lens_z",
default_value=0,
is_setting=True,
)
self._append(
AdjustableFS,
"/photonics/home/gac-bernina/eco/configuration/p21954_lens_x0",
name="offset_lens_x",
default_value=0,
is_setting=True,
)
self._append(
AdjustableFS,
"/photonics/home/gac-bernina/eco/configuration/p21954_par_z0",
name="offset_par_z",
default_value=0,
is_setting=True,
)
self._append(
AdjustableFS,
"/photonics/home/gac-bernina/eco/configuration/p21954_mir_z0",
name="offset_mir_z",
default_value=0,
is_setting=True,
)
def get_focus_lens_z(nx, nz):
return nz - self.offset_lens_z()
def set_focus_lens_z(z):
nx = self.offset_lens_x() - z*np.tan(np.deg2rad(14.1))
nz = self.offset_lens_z() + z
return nx, nz
self._append(
AdjustableVirtual,
[nx, nz],
get_focus_lens_z,
set_focus_lens_z,
reset_current_value_to=True,
name="focus_lens",
)
def get_focus_par_z(mx, mz):
return mz - self.offset_par_z()
def set_focus_par_z(z):
mx = self.offset_par_z() + z
mz = self.offset_mir_z() + z
return mx, mz
self._append(
AdjustableVirtual,
[mx, mz],
get_focus_par_z,
set_focus_par_z,
reset_current_value_to=True,
name="focus_par",
)
def set_offsets_to_current_value(self):
self.offset_lens_x.mv(self._nx())
self.offset_lens_z.mv(self._nz())
self.offset_par_z.mv(self._mx())
self.offset_mir_z.mv(self._mz())
class MidIR(Assembly):
def __init__(
@@ -102,13 +181,20 @@ class MidIR(Assembly):
is_setting=True,
is_display=True,
)
#self._append(
# SmaractRecord,
# "SARES23-USR:MOT_7",
# name="mirr_z",
# is_setting=True,
# is_display=True,
#)
self._append(
MotorRecord,
"SARES20-MF1:MOT_16",
name="polariser",
is_setting=True,
is_display=True,
)
self._append(
SmaractRecord,
"SARES23-USR:MOT_4",
name="mirr_z",
is_setting=True,
is_display=True,
)
self._append(
MotorRecord,
"SLAAR21-LMTS-SMAR1:MOT_2",
@@ -118,7 +204,7 @@ class MidIR(Assembly):
)
self._append(
MotorRecord,
"SARES23-USR:MOT_2",
"SARES23-USR:MOT_5",
name="power_check",
is_setting=True,
is_display=True,
@@ -218,9 +304,30 @@ class MidIR(Assembly):
"SARES20-CEP01:spectrometer_ratio",
cachannel=None,
name="spectrometer_ratio",
is_setting=False,
is_setting=False,
is_display=True,
)
# Virtual stages ###
self._append(
CartCooRotated,
x_adj=self.x,
y_adj=self.y,
z_adj=self.z,
names_rotated_axes=['xlens','ylens','zlens'],
file_rotation='/photonics/home/gac-bernina/eco/configuration/p21954_lens_stage_rotation',
name='lens_beam_direction'
)
self._append(
MIRVirtualStages,
name="virtual_stages",
nx=self.x,
nz=self.z,
mx=self.mirr_z,
mz=self.z,
is_setting=False,
)
self._append(
DetectorBsStream,
"SARES20-CEP01:spectrometer_correlation",
@@ -301,7 +408,6 @@ class MidIR(Assembly):
except Exception as e:
print(f"Timetool pv writing pipeline initialization failed with: \n{e}")
class Spectrometer(Assembly):
def __init__(self, pvname, name=None):
super().__init__(name=name)
+100
View File
@@ -0,0 +1,100 @@
from eco.elements.adjustable import AdjustableVirtual, AdjustableFS
from scipy.spatial.transform import Rotation
from eco import Assembly
class CartCooRotated(Assembly):
def __init__(self,
x_adj=AdjustableFS('./delme_x',default_value=0,name='x'),
y_adj=AdjustableFS('./delme_y',default_value=0,name='y'),
z_adj=AdjustableFS('./delme_z',default_value=0,name='z'),
euler_seq = 'xyz',
euler_angles_deg = [0,0,0],
file_rotation='./delme_rotation',
names_rotated_axes = ['xp','yp','zp'],
change_simultaneously = True,
reset_current_value_to = False,
check_limits= False,
append_aliases=False,
name=None):
super().__init__(name=name)
self._append(AdjustableFS,
file_rotation,
default_value={'euler_sequence':euler_seq, 'euler_angles_deg':euler_angles_deg},
name='rotdef')
# self._x = x_adj
# self._y = y_adj
# self._z = z_adj
self._append(x_adj,name='_x',is_setting=True,is_display=False)
self._append(y_adj,name='_y',is_setting=True,is_display=False)
self._append(z_adj,name='_z',is_setting=True,is_display=False)
self._append(AdjustableVirtual,
[self._x,self._y,self._z],
lambda x,y,z: self.get_rotated_coo(x,y,z)[0],
lambda xp: self.get_base_coo(xp=xp),
change_simultaneously=change_simultaneously,
reset_current_value_to=reset_current_value_to,
check_limits=check_limits,
append_aliases=append_aliases,
is_status=True,
is_setting=False,
is_display=True,
name=names_rotated_axes[0])
self._append(AdjustableVirtual,
[self._x,self._y,self._z],
lambda x,y,z: self.get_rotated_coo(x,y,z)[1],
lambda yp: self.get_base_coo(yp=yp),
change_simultaneously=change_simultaneously,
reset_current_value_to=reset_current_value_to,
check_limits=check_limits,
append_aliases=append_aliases,
is_status=True,
is_setting=False,
is_display=True,
name=names_rotated_axes[1])
self._append(AdjustableVirtual,
[self._x,self._y,self._z],
lambda x,y,z: self.get_rotated_coo(x,y,z)[2],
lambda zp: self.get_base_coo(zp=zp),
change_simultaneously=change_simultaneously,
reset_current_value_to=reset_current_value_to,
check_limits=check_limits,
append_aliases=append_aliases,
is_status=True,
is_setting=False,
is_display=True,
name=names_rotated_axes[2])
self._adjs_rotated_axes = [self.__dict__[tname] for tname in names_rotated_axes]
@property
def rotation(self):
euler = self.rotdef.get_current_value()
return Rotation.from_euler(euler['euler_sequence'],euler['euler_angles_deg'],degrees=True)
def get_rotated_coo(self,x=None,y=None,z=None):
if x is None:
x = self._x.get_current_value()
if y is None:
y = self._y.get_current_value()
if z is None:
z = self._z.get_current_value()
return tuple(self.rotation.inv().apply([x,y,z]))
def get_base_coo(self,xp=None,yp=None,zp=None):
if xp is None:
xp = self._adjs_rotated_axes[0].get_current_value()
if yp is None:
yp = self._adjs_rotated_axes[1].get_current_value()
if zp is None:
zp = self._adjs_rotated_axes[2].get_current_value()
return tuple(self.rotation.apply([xp,yp,zp]))
+88
View File
@@ -0,0 +1,88 @@
import gspread
import numpy as np
class RuntableGsheet:
def __init__(self,sheet,
wstitle_available_keys='Available keys',
range_available_keys = ['A2','A10000000'],
wstitle_run_table='Custom table',
range_run_table_keys=['A1','ZZZ1'],
name_delimiter='/',
remove_leading=''):
self._spreadsheet = sheet
self._wstitle_available_keys = wstitle_available_keys
self._range_available_keys = range_available_keys
self._wstitle_run_table = wstitle_run_table
self._range_run_table_keys = range_run_table_keys
self._name_delimiter = name_delimiter
self._remove_leading = remove_leading
def require_worksheets(self):
tls = [tmp.title for tmp in self._spreadsheet.worksheets()]
for title in [self._wstitle_available_keys, self._wstitle_run_table]:
if not title in tls:
self._spreadsheet.add_worksheet(title,1,1)
def get_available_keys(self):
ks = self._spreadsheet.worksheet(self._wstitle_available_keys).get_values(':'.join(self._range_available_keys))
return [x for xs in ks for x in xs]
def set_available_keys(self,keys):
rng = gspread.utils.a1_range_to_grid_range(':'.join(self._range_available_keys))
shape = (rng['endColumnIndex']-rng['startColumnIndex'], rng['endRowIndex']-rng['startRowIndex'])
cells = []
nrowstot = 0
ncolstot = 0
for i,k in enumerate(keys):
ti = np.unravel_index(i, shape, order='C')
cells.append(
gspread.Cell(
ti[1]+rng['startRowIndex']+1,
ti[0]+rng['startColumnIndex']+1,
k.split(self._remove_leading)[1]))
nrowstot = int(max(nrowstot,ti[1]+rng['startRowIndex']+1))
ncolstot = int(max(ncolstot,ti[0]+rng['startColumnIndex']+1))
if cells:
ws = self._spreadsheet.worksheet(self._wstitle_available_keys)
nr = ws.row_count
if nr < nrowstot:
ws.add_rows(nrowstot-nr)
nc = ws.col_count
if nc < ncolstot:
ws.add_cols(ncolstot-nc)
ws.update_cells(cells)
def fill_run_table_data(self, table):
cell_list = self._spreadsheet.worksheet(self._wstitle_run_table).range(':'.join(self._range_run_table_keys))
set_cells= []
nrowstot = 0
ncolstot = 0
test_keys = [tk.split(self._remove_leading)[1] for tk in table.keys()]
for cell in cell_list:
tstr = cell.value
if not isinstance(tstr,str):
continue
tstr = tstr.split(self._name_delimiter)[0].strip()
if tstr in test_keys:
set_vals = table[tstr]
for n,set_val in enumerate(set_vals):
set_cells.append(gspread.Cell(cell.row + n + 1, cell.col, set_val))
nrowstot = int(max(nrowstot,cell.row + n + 1))
ncolstot = int(max(ncolstot,cell.col))
if set_cells:
ws = self._spreadsheet.worksheet(self._wstitle_run_table)
nr = ws.row_count
if nr < nrowstot:
ws.add_rows(nrowstot-nr)
nc = ws.col_count
if nc < ncolstot:
ws.add_cols(ncolstot-nc)
ws.update_cells(set_cells)