Added attocube module

This commit is contained in:
2022-09-23 09:36:33 +02:00
parent 33ff2c9722
commit 362daa7846
15 changed files with 371 additions and 3 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

354
attocube.py Normal file
View File

@ -0,0 +1,354 @@
from re import S
import time
import subprocess
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
class AttocubeStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = AttocubeAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f"{tname} \"{name}\""
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
class AttocubeAxis(Adjustable):
def __init__(self, ID, name=None, units=None, internal=False):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self.timeout = 60
self._move_requested = False
self.pvs = SimpleNamespace(
drive = PV(ID + "-SET_TARGET"),
readback = PV(ID + "-POS"),
amplitude = PV(ID + "-AMPL_SET"),
amplitude_rb = PV(ID + "-AMPL_RB"),
frequency = PV(ID + "-FREQ_SET"),
frequency_rb = PV(ID + "-FREQ_RB"),
moving_status = PV(ID + "-MOVING"),
hlm = PV(ID + ":HLM"),
llm = PV(ID + ":LLM"),
set_pos = PV(ID + ":SET_POS"),
stop = PV(ID + "-STOP_AUTO_CMD"),
move = PV(ID + "-MV_ABS_SET"),
stop_auto_cmd = PV(ID + "-STOP_AUTO_CMD"),
target_tol_rb = PV(ID + "-TARGET_RANGE"),
target_tol_set = PV(ID + "-SET_TARGET_RANGE"),
target_reached = PV(ID + "-TARGET_REACHED_RB"),
target_ground = PV(ID + "-TARGET_GND_SET"),
output_enabled = PV(ID + "-ENABLE_SET"),
twv = PV(ID + ":TWV"),
units = PV(ID + "-UNIT"),
output_RB = PV(ID + "-ENABLE_RB"),
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def disable_output(self,verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(0)
if verbose:
if old_value == 1:
print("Output has been disabled")
else:
print("Output has been off and stays that way")
def enable_output(self,verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(1)
if verbose:
if old_value == 0:
print("Output has been enabled")
else:
print("Output has been on and stays that way")
def get_ground_on_targert(self):
return self.pvs.target_ground.value
def get_amplitude(self,verbose=False):
amplitude = self.pvs.amplitude_rb.value
if verbose:
print(f'Drive amplitude is {amplitude/1000} V')
return amplitude/1000
def set_amplitude(self,value,verbose=False,check=False):
"""Set drive amplitude for the axis in Volts. Add check=True for checking if it happened (takes 2s)."""
self.pvs.amplitude.put(value*1000)
if check:
time.sleep(2)
assert self.pvs.amplitude_rb.value == value*1000, 'drive amplitude readback does not match set value'
if verbose:
print(f'Drive amplitude is set to {value} V')
def get_frequency(self,verbose=False):
frequency = self.pvs.frequency_rb.value
if verbose:
print(f'Drive frequency is {frequency} V')
return frequency
def set_frequency(self,value,verbose=False,check=False):
"""Set drive frequency for the axis in Hz. Add check=True for checking if it happened (takes 2s)."""
self.pvs.frequency.put(value)
if check:
time.sleep(2)
assert self.pvs.frequency_rb.value == value, 'drive frequency readback does not match set value'
if verbose:
print(f'Drive frequency is set to {value} Hz')
def set_ground_on_target(self,value,verbose=True):
if value == True or value == 1:
self.pvs.target_ground.put(1)
if verbose:
print("grounding upon target arrival is now active")
elif value == False or value == 0:
self.pvs.target_ground.put(0)
if verbose:
print("grounding upon target arrival has been deactivated")
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def get_target_tolerance(self):
return self.pvs.target_tol_rb.value
def set_target_tolerance(self,value):
self.pvs.target_tol_set.put(value)
print(f"target tolerance set to {value} {self.pvs.units.char_value}")
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def set_target_value(self, value):
print(f"moving to {value}")
wait_time = self.wait_time
timeout = self.timeout + time.time()
self._move_requested = True
self.pvs.drive.put(value, wait=True)
# Move button must be pressed to make the move
self.set_move_allowed(True)
# wait for start
while self._move_requested and not self.is_moving():
time.sleep(wait_time)
if time.time() >= timeout:
tname = typename(self)
self.stop()
raise AttocubeError(f"starting to move {tname} \"{self.name}\" to {value} {self.pvs.units.char_value} timed out")
# wait for move done
while self._move_requested and self.is_moving():
if self.is_at_target(): # holding == arrived at target!
break
time.sleep(wait_time)
self._move_requested = False
print("move done")
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_output_on(self):
return self.pvs.output_RB.value == 1
def is_moving(self):
return self.pvs.moving_status.value == 1
def is_at_target(self):
return self.pvs.target_reached.value == 1
def allow_move(self):
self.pvs.move.put(1, wait=True)
def forbid_move(self):
self.pvs.move.put(0, wait=True)
def set_move_allowed(self,value):
if value == True or value == 1:
self.pvs.move.put(1)
elif value == False or value == 0:
self.pvs.move.put(0)
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
###
# Things to be desired from the epics module:
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
# 2. Soft epics limits
def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0, pos_check_delay=0.15, target_gnd=None):
"""
moves Attocube drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
ignore_limits try move without regard to limits (T/F) [F]
confirm_move try to confirm that move has begun (T/F) [F]
timeout max time for move to complete (in seconds) [300]
pos_check_delay delay before checkig motor in position (in seconds) [0.1]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout)
if not self.is_output_on():
self.enable_output(verbose=False)
if target_gnd == True or target_gnd == 1:
self.set_ground_on_target(True,verbose=False)
elif target_gnd == False or target_gnd == 0:
self.set_ground_on_target(False,verbose=False)
self.allow_move()
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
t0 = time.time()
tstart = t0 + min(timeout, 10)
tout = t0 + timeout
if not wait and not confirm_move:
return EXECUTED
if not wait:
return CONFIRMED
if time.time() > tout:
return TIMEOUT
time.sleep(pos_check_delay) # Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
while self.is_at_target() == False and time.time() <= tout:
ca.poll(evt=1.0e-2)
if self.is_at_target():
return SUCCESS
return UNKNOWN_ERROR
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
return subprocess.Popen(cmd, shell=True)
class AttocubeError(AdjustableError):
pass

6
attocube_device_def.py Normal file
View File

@ -0,0 +1,6 @@
from attocube import AttocubeStage
attocube = AttocubeStage("SARES30-ATTOCUBE",
Z='SARES30-ATTOCUBE:A0',
X='SARES30-ATTOCUBE:A1',
)

View File

@ -73,13 +73,21 @@ from slic.devices.xoptics import slits
## Smaract stage ## Smaract stage
from smaract_device_def import smaract from smaract_device_def import smaract
# from attocube_assignment import attocube from attocube_device_def import attocube
########################################### ###########################################
instrument = "cristallina" instrument = "cristallina"
# pgroup = "p19739" # commissioning March 2022 -- July 2022 # pgroup = "p19739" # commissioning March 2022 -- July 2022
pgroup = "p20443" # commissioning Wavefront Sensor August 2022 pgroup = "p20443" # commissioning Wavefront Sensor August 2022 (active)
# pgroup = "p20558" # SwissMX commissioning 3
# pgroup = "p20557" # CrQ PMS commisioning 1
# pgroup = "p20509" # CrQ commissoing DilSc1 (active)
# pgroup = "p20519" # beamline commissioning 2
# pgroup = "p19150" # Scratch
# pgroup = "p19152" # Scratch
daq = SFAcquisition( daq = SFAcquisition(
instrument, instrument,