attocube axes are scannable now + added axis.config
This commit is contained in:
111
attocube.py
111
attocube.py
@@ -1,7 +1,10 @@
|
|||||||
from distutils.command.install_egg_info import to_filename
|
from distutils.command.install_egg_info import to_filename
|
||||||
from re import S
|
from re import S
|
||||||
|
from shutil import move
|
||||||
|
from tarfile import is_tarfile
|
||||||
import time
|
import time
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from turtle import pos
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from epics import ca
|
from epics import ca
|
||||||
@@ -38,16 +41,41 @@ class AttocubeStage(BaseDevice):
|
|||||||
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
|
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
|
||||||
return printable_dict(to_print, head)
|
return printable_dict(to_print, head)
|
||||||
|
|
||||||
|
# Commments after talking to Sven:
|
||||||
|
# 1. give values in init and then fill self.config
|
||||||
|
# 2. set_target_value() is the method for the scanner. Therefore, update it so that it has move() inside it.
|
||||||
|
# 3. is_moving() is the other part for the scanner, it will need updating as well
|
||||||
|
# 4. after that scanning should work properly
|
||||||
|
|
||||||
|
# 5. For hole drilling something like this would make sense:
|
||||||
|
# t = daq.aquire(...)
|
||||||
|
# for pos in []:
|
||||||
|
# mot.set_target_value(pos).wait()
|
||||||
|
# sleep()
|
||||||
|
# t.stop()
|
||||||
|
# where .acqure() has a the event_map - basically for every pulse id it shows whether the event was on or off. this way we can search for pulse ids for every drilling shot if we wanted to.
|
||||||
|
# event map channel is probably somewhere on the list when typing: " ioc records .*Evt.* "
|
||||||
|
|
||||||
class AttocubeAxis(Adjustable):
|
class AttocubeAxis(Adjustable):
|
||||||
|
|
||||||
def __init__(self, ID, name=None, units=None, internal=False):
|
def __init__(self, ID, name=None, units=None, internal=False, tolerance=0.3, timeout=300.0, pos_check_delay=0.15, target_gnd=None, move_attempts = 5, move_attempt_sleep=4, verbose_move = False, ignore_limits=True, confirm_move= True):
|
||||||
super().__init__(ID, name=name, units=units, internal=internal)
|
super().__init__(ID, name=name, units=units, internal=internal)
|
||||||
|
|
||||||
self.wait_time = 0.1
|
self.wait_time = 0.1
|
||||||
self.timeout = 60
|
|
||||||
self._move_requested = False
|
self._move_requested = False
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
self.config = SimpleNamespace(
|
||||||
|
tolerance = tolerance,
|
||||||
|
timeout = timeout,
|
||||||
|
pos_check_delay = pos_check_delay,
|
||||||
|
target_gnd = target_gnd,
|
||||||
|
move_attempts = move_attempts,
|
||||||
|
move_attempt_sleep = move_attempt_sleep,
|
||||||
|
verbose_move = verbose_move,
|
||||||
|
ignore_limits = ignore_limits,
|
||||||
|
confirm_move = confirm_move,
|
||||||
|
)
|
||||||
|
|
||||||
self.pvs = SimpleNamespace(
|
self.pvs = SimpleNamespace(
|
||||||
drive = PV(ID + "-SET_TARGET"),
|
drive = PV(ID + "-SET_TARGET"),
|
||||||
@@ -160,35 +188,6 @@ class AttocubeAxis(Adjustable):
|
|||||||
else:
|
else:
|
||||||
return self.pvs.drive.get()
|
return self.pvs.drive.get()
|
||||||
|
|
||||||
def set_target_value(self, value):
|
|
||||||
print(f"moving to {value}")
|
|
||||||
wait_time = self.wait_time
|
|
||||||
timeout = self.timeout + time.time()
|
|
||||||
|
|
||||||
self._move_requested = True
|
|
||||||
self.pvs.drive.put(value, wait=True)
|
|
||||||
|
|
||||||
# Move button must be pressed to make the move
|
|
||||||
self.set_move_allowed(True)
|
|
||||||
|
|
||||||
# wait for start
|
|
||||||
while self._move_requested and not self.is_moving():
|
|
||||||
time.sleep(wait_time)
|
|
||||||
if time.time() >= timeout:
|
|
||||||
tname = typename(self)
|
|
||||||
self.stop()
|
|
||||||
raise AttocubeError(f"starting to move {tname} \"{self.name}\" to {value} {self.pvs.units.char_value} timed out")
|
|
||||||
|
|
||||||
# wait for move done
|
|
||||||
while self._move_requested and self.is_moving():
|
|
||||||
if self.is_at_target(): # holding == arrived at target!
|
|
||||||
break
|
|
||||||
time.sleep(wait_time)
|
|
||||||
|
|
||||||
self._move_requested = False
|
|
||||||
print("move done")
|
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._move_requested = False
|
self._move_requested = False
|
||||||
self.pvs.stop.put(1, wait=True)
|
self.pvs.stop.put(1, wait=True)
|
||||||
@@ -197,12 +196,14 @@ class AttocubeAxis(Adjustable):
|
|||||||
return self.pvs.output_RB.value == 1
|
return self.pvs.output_RB.value == 1
|
||||||
|
|
||||||
def is_moving(self):
|
def is_moving(self):
|
||||||
return self.pvs.moving_status.value == 1
|
return self.is_at_target()
|
||||||
|
|
||||||
def is_at_target(self,from_pv=False,tolerance = 0.3):
|
def is_at_target(self,from_pv=False, tolerance = None):
|
||||||
"""Check whether target was reached.
|
"""Check whether target was reached.
|
||||||
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
|
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
|
||||||
Default is False, because otherwise tolerance can only be set and read out as an integer."""
|
Default is False, because otherwise tolerance can only be set and read out as an integer."""
|
||||||
|
if tolerance == None:
|
||||||
|
tolerance = self.config.tolerance
|
||||||
if from_pv == True:
|
if from_pv == True:
|
||||||
return self.pvs.target_reached.value == 1
|
return self.pvs.target_reached.value == 1
|
||||||
else:
|
else:
|
||||||
@@ -246,8 +247,7 @@ class AttocubeAxis(Adjustable):
|
|||||||
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
|
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
|
||||||
# 2. Soft epics limits
|
# 2. Soft epics limits
|
||||||
|
|
||||||
|
def set_target_value(self, val, relative=False, wait=True):
|
||||||
def move(self, val, relative=False, wait=True, ignore_limits=True, confirm_move=True, timeout=300.0, pos_check_delay=0.15, target_gnd=None, move_attempts = 5, move_attempt_sleep=4, verbose = False, tolerance=0.3):
|
|
||||||
"""
|
"""
|
||||||
moves Attocube drive to position (emulating pyepics Motor class)
|
moves Attocube drive to position (emulating pyepics Motor class)
|
||||||
|
|
||||||
@@ -256,13 +256,6 @@ class AttocubeAxis(Adjustable):
|
|||||||
val value to move to (float) [Must be provided]
|
val value to move to (float) [Must be provided]
|
||||||
relative move relative to current position (T/F) [F]
|
relative move relative to current position (T/F) [F]
|
||||||
wait whether to wait for move to complete (T/F) [F]
|
wait whether to wait for move to complete (T/F) [F]
|
||||||
ignore_limits try move without regard to limits (no epics limits implemented yet) (T/F) [T]
|
|
||||||
confirm_move try to confirm that move has begun (T/F) [F]
|
|
||||||
timeout max time for move to complete (in seconds) [300]
|
|
||||||
pos_check_delay delay before checkig motor in position (in seconds) [0.15]
|
|
||||||
move_attempts how many times should the move be attempted if not in limits (attocube often needs to be poked a few times for the last 1um) [5]
|
|
||||||
move_attempt_sleep how long to wait before another move poke attept is done (in seconds) [4]
|
|
||||||
tolerance when to consider target destination reached (in microns) [0.3]
|
|
||||||
|
|
||||||
return values:
|
return values:
|
||||||
==============
|
==============
|
||||||
@@ -301,18 +294,18 @@ class AttocubeAxis(Adjustable):
|
|||||||
if relative:
|
if relative:
|
||||||
val += self.pvs.drive.get()
|
val += self.pvs.drive.get()
|
||||||
|
|
||||||
if not ignore_limits:
|
if not self.config.ignore_limits:
|
||||||
if not self.within_epics_limits(val):
|
if not self.within_epics_limits(val):
|
||||||
return OUTSIDE_LIMITS
|
return OUTSIDE_LIMITS
|
||||||
|
|
||||||
put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout)
|
put_stat = self.pvs.drive.put(val, wait=wait, timeout=self.config.timeout)
|
||||||
|
|
||||||
if not self.is_output_on():
|
if not self.is_output_on():
|
||||||
self.enable_output(verbose=False)
|
self.enable_output(verbose=False)
|
||||||
|
|
||||||
if target_gnd == True or target_gnd == 1:
|
if self.config.target_gnd == True or self.config.target_gnd == 1:
|
||||||
self.set_ground_on_target(True,verbose=False)
|
self.set_ground_on_target(True,verbose=False)
|
||||||
elif target_gnd == False or target_gnd == 0:
|
elif self.config.target_gnd == False or self.config.target_gnd == 0:
|
||||||
self.set_ground_on_target(False,verbose=False)
|
self.set_ground_on_target(False,verbose=False)
|
||||||
|
|
||||||
self.allow_move()
|
self.allow_move()
|
||||||
@@ -327,10 +320,10 @@ class AttocubeAxis(Adjustable):
|
|||||||
return UNKNOWN_ERROR
|
return UNKNOWN_ERROR
|
||||||
|
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
tstart = t0 + min(timeout, 10)
|
tstart = t0 + min(self.config.timeout, 10)
|
||||||
tout = t0 + timeout
|
tout = t0 + self.config.timeout
|
||||||
|
|
||||||
if not wait and not confirm_move:
|
if not wait and not self.config.confirm_move:
|
||||||
return EXECUTED
|
return EXECUTED
|
||||||
|
|
||||||
if not wait:
|
if not wait:
|
||||||
@@ -340,25 +333,25 @@ class AttocubeAxis(Adjustable):
|
|||||||
return TIMEOUT
|
return TIMEOUT
|
||||||
|
|
||||||
# Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
|
# Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
|
||||||
time.sleep(pos_check_delay)
|
time.sleep(self.config.pos_check_delay)
|
||||||
|
|
||||||
while self.is_at_target(tolerance=tolerance) == False and time.time() <= tout:
|
while self.is_at_target() == False and time.time() <= tout:
|
||||||
# If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position.
|
# If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position.
|
||||||
if self.is_at_target( tolerance = 2):
|
if self.is_at_target( tolerance = 2):
|
||||||
for attempt in range(move_attempts):
|
for attempt in range(self.config.move_attempts):
|
||||||
if verbose:
|
if self.config.verbose_move:
|
||||||
print(f'move attempt: {attempt+1}')
|
print(f'move attempt: {attempt+1}')
|
||||||
self.pvs.move.put(1, wait=True)
|
self.pvs.move.put(1, wait=True)
|
||||||
time.sleep(move_attempt_sleep)
|
time.sleep(self.config.move_attempt_sleep)
|
||||||
if self.is_at_target(tolerance=tolerance):
|
if self.is_at_target():
|
||||||
if verbose:
|
if self.config.verbose_move:
|
||||||
print(f'Move finished. Had to poke the cube {attempt+1} times to get there though.')
|
print(f'Move finished. Had to poke the cube {attempt+1}x to get there though.')
|
||||||
return SUCCESS
|
return SUCCESS
|
||||||
|
|
||||||
print('Position reached within 2um, but not better.')
|
print('Position reached within 2um, but not better.')
|
||||||
return TIMEOUT
|
return TIMEOUT
|
||||||
|
|
||||||
if self.is_at_target(tolerance=tolerance):
|
if self.is_at_target():
|
||||||
return SUCCESS
|
return SUCCESS
|
||||||
|
|
||||||
return UNKNOWN_ERROR
|
return UNKNOWN_ERROR
|
||||||
@@ -368,8 +361,6 @@ class AttocubeAxis(Adjustable):
|
|||||||
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
|
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
|
||||||
return subprocess.Popen(cmd, shell=True)
|
return subprocess.Popen(cmd, shell=True)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AttocubeError(AdjustableError):
|
class AttocubeError(AdjustableError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user