384 lines
13 KiB
Python
Executable File
384 lines
13 KiB
Python
Executable File
from distutils.command.install_egg_info import to_filename
|
|
from re import S
|
|
from shutil import move
|
|
from tarfile import is_tarfile
|
|
import time
|
|
import subprocess
|
|
from turtle import pos
|
|
from types import SimpleNamespace
|
|
from enum import IntEnum
|
|
from epics import ca
|
|
|
|
|
|
# import slic
|
|
from slic.core.adjustable import Adjustable, AdjustableError
|
|
from slic.utils import typename
|
|
from slic.utils.printing import printable_dict
|
|
from slic.utils.hastyepics import get_pv as PV
|
|
|
|
# from ..basedevice import BaseDevice
|
|
from slic.core.device.basedevice import BaseDevice
|
|
|
|
|
|
class AttocubeStage(BaseDevice):
|
|
def __init__(self, name=None, **axis_ids):
|
|
self.name = name
|
|
self.axis_ids = axis_ids
|
|
|
|
self.axes = {}
|
|
for ax_name, ax_id in axis_ids.items():
|
|
record_name = f"{name}: {ax_name}"
|
|
ax = AttocubeAxis(ax_id, name=record_name)
|
|
setattr(self, ax_name, ax)
|
|
self.axes[ax_name] = ax
|
|
|
|
def __repr__(self):
|
|
tname = typename(self)
|
|
name = self.name
|
|
head = f'{tname} "{name}"'
|
|
|
|
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
|
|
return printable_dict(to_print, head)
|
|
|
|
|
|
# Commments after talking to Sven:
|
|
# 1. give values in init and then fill self.config
|
|
# 2. set_target_value() is the method for the scanner. Therefore, update it so that it has move() inside it.
|
|
# 3. is_moving() is the other part for the scanner, it will need updating as well
|
|
# 4. after that scanning should work properly
|
|
|
|
# 5. For hole drilling something like this would make sense:
|
|
# t = daq.aquire(...)
|
|
# for pos in []:
|
|
# mot.set_target_value(pos).wait()
|
|
# sleep()
|
|
# t.stop()
|
|
# where .acqure() has a the event_map - basically for every pulse id it shows whether the event was on or off. this way we can search for pulse ids for every drilling shot if we wanted to.
|
|
# event map channel is probably somewhere on the list when typing: " ioc records .*Evt.* "
|
|
|
|
|
|
class AttocubeAxis(Adjustable):
|
|
def __init__(
|
|
self,
|
|
ID,
|
|
name=None,
|
|
units=None,
|
|
internal=False,
|
|
tolerance=0.3,
|
|
timeout=300.0,
|
|
pos_check_delay=0.15,
|
|
target_gnd=None,
|
|
move_attempts=5,
|
|
move_attempt_sleep=4,
|
|
verbose_move=False,
|
|
ignore_limits=True,
|
|
confirm_move=True,
|
|
):
|
|
super().__init__(ID, name=name, units=units, internal=internal)
|
|
|
|
self.wait_time = 0.1
|
|
self._move_requested = False
|
|
self.timeout = timeout
|
|
|
|
self.config = SimpleNamespace(
|
|
tolerance=tolerance,
|
|
timeout=timeout,
|
|
pos_check_delay=pos_check_delay,
|
|
target_gnd=target_gnd,
|
|
move_attempts=move_attempts,
|
|
move_attempt_sleep=move_attempt_sleep,
|
|
verbose_move=verbose_move,
|
|
ignore_limits=ignore_limits,
|
|
confirm_move=confirm_move,
|
|
)
|
|
|
|
self.pvs = SimpleNamespace(
|
|
drive=PV(ID + "-SET_TARGET"),
|
|
readback=PV(ID + "-POS"),
|
|
amplitude=PV(ID + "-AMPL_SET"),
|
|
amplitude_rb=PV(ID + "-AMPL_RB"),
|
|
frequency=PV(ID + "-FREQ_SET"),
|
|
frequency_rb=PV(ID + "-FREQ_RB"),
|
|
moving_status=PV(ID + "-MOVING"),
|
|
hlm=PV(ID + ":HLM"), # not implemented yet, but hopefully will be added later
|
|
llm=PV(ID + ":LLM"), # not implemented yet, but hopefully will be added later
|
|
stop=PV(ID + "-STOP_AUTO_CMD"),
|
|
move=PV(ID + "-MV_ABS_SET"),
|
|
stop_auto_cmd=PV(ID + "-STOP_AUTO_CMD"),
|
|
target_tol_rb=PV(ID + "-TARGET_RANGE"),
|
|
target_tol_set=PV(ID + "-SET_TARGET_RANGE"),
|
|
target_reached=PV(ID + "-TARGET_REACHED_RB"),
|
|
target_ground=PV(ID + "-TARGET_GND_SET"),
|
|
output_enabled=PV(ID + "-ENABLE_SET"),
|
|
units=PV(ID + "-UNIT"),
|
|
output_RB=PV(ID + "-ENABLE_RB"),
|
|
)
|
|
|
|
@property
|
|
def units(self):
|
|
units = self._units
|
|
if units is not None:
|
|
return units
|
|
return self.pvs.units.get()
|
|
|
|
@units.setter
|
|
def units(self, value):
|
|
self._units = value
|
|
|
|
def disable_output(self, verbose=True):
|
|
old_value = self.pvs.output_RB.value
|
|
self.pvs.output_enabled.put(0)
|
|
if verbose:
|
|
if old_value == 1:
|
|
print("Output has been disabled")
|
|
else:
|
|
print("Output has been off and stays that way")
|
|
|
|
def enable_output(self, verbose=True):
|
|
old_value = self.pvs.output_RB.value
|
|
self.pvs.output_enabled.put(1)
|
|
if verbose:
|
|
if old_value == 0:
|
|
print("Output has been enabled")
|
|
else:
|
|
print("Output has been on and stays that way")
|
|
|
|
def get_ground_on_targert(self):
|
|
return self.pvs.target_ground.value
|
|
|
|
def get_amplitude(self, verbose=False):
|
|
amplitude = self.pvs.amplitude_rb.value
|
|
if verbose:
|
|
print(f"Drive amplitude is {amplitude/1000} V")
|
|
return amplitude / 1000
|
|
|
|
def set_amplitude(self, value, verbose=False, check=False):
|
|
"""Set drive amplitude for the axis in Volts. Add check=True for checking if it happened (takes 2s)."""
|
|
self.pvs.amplitude.put(value * 1000)
|
|
if check:
|
|
time.sleep(2)
|
|
assert self.pvs.amplitude_rb.value == value * 1000, "drive amplitude readback does not match set value"
|
|
if verbose:
|
|
print(f"Drive amplitude is set to {value} V")
|
|
|
|
def get_frequency(self, verbose=False):
|
|
frequency = self.pvs.frequency_rb.value
|
|
if verbose:
|
|
print(f"Drive frequency is {frequency} V")
|
|
return frequency
|
|
|
|
def set_frequency(self, value, verbose=False, check=False):
|
|
"""Set drive frequency for the axis in Hz. Add check=True for checking if it happened (takes 2s)."""
|
|
self.pvs.frequency.put(value)
|
|
if check:
|
|
time.sleep(2)
|
|
assert self.pvs.frequency_rb.value == value, "drive frequency readback does not match set value"
|
|
if verbose:
|
|
print(f"Drive frequency is set to {value} Hz")
|
|
|
|
def set_ground_on_target(self, value, verbose=True):
|
|
if value == True or value == 1:
|
|
self.pvs.target_ground.put(1)
|
|
if verbose:
|
|
print("grounding upon target arrival is now active")
|
|
elif value == False or value == 0:
|
|
self.pvs.target_ground.put(0)
|
|
if verbose:
|
|
print("grounding upon target arrival has been deactivated")
|
|
else:
|
|
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
|
|
|
|
def get_target_tolerance(self):
|
|
return self.pvs.target_tol_rb.value
|
|
|
|
def set_target_tolerance(self, value):
|
|
self.pvs.target_tol_set.put(value)
|
|
print(f"target tolerance set to {value} {self.pvs.units.char_value}")
|
|
|
|
def get_current_value(self, readback=True):
|
|
if readback:
|
|
return self.pvs.readback.get()
|
|
else:
|
|
return self.pvs.drive.get()
|
|
|
|
def stop(self):
|
|
self._move_requested = False
|
|
self.pvs.stop.put(1, wait=True)
|
|
|
|
def is_output_on(self):
|
|
return self.pvs.output_RB.value == 1
|
|
|
|
def is_moving(self):
|
|
return self.is_at_target()
|
|
|
|
def is_at_target(self, from_pv=False, tolerance=None):
|
|
"""Check whether target was reached.
|
|
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
|
|
Default is False, because otherwise tolerance can only be set and read out as an integer."""
|
|
if tolerance == None:
|
|
tolerance = self.config.tolerance
|
|
if from_pv == True:
|
|
return self.pvs.target_reached.value == 1
|
|
else:
|
|
return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance
|
|
|
|
def allow_move(self):
|
|
self.pvs.move.put(1, wait=True)
|
|
|
|
def forbid_move(self):
|
|
self.pvs.move.put(0, wait=True)
|
|
|
|
def set_move_allowed(self, value):
|
|
if value == True or value == 1:
|
|
self.pvs.move.put(1)
|
|
elif value == False or value == 0:
|
|
self.pvs.move.put(0)
|
|
else:
|
|
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
|
|
|
|
def within_epics_limits(self, val):
|
|
low, high = self.get_epics_limits()
|
|
return low <= val <= high
|
|
|
|
def get_epics_limits(self):
|
|
low = self.pvs.llm.get()
|
|
high = self.pvs.hlm.get()
|
|
return low, high
|
|
|
|
def set_epics_limits(self, low, high, relative_to_current=False):
|
|
low = -np.inf if low is None else low
|
|
high = +np.inf if high is None else high
|
|
if relative_to_current:
|
|
val = self.get_current_value()
|
|
low += val
|
|
high += val
|
|
self.pvs.llm.put(low)
|
|
self.pvs.hlm.put(high)
|
|
|
|
###
|
|
# Things to be desired from the epics module:
|
|
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
|
|
# 2. Soft epics limits
|
|
|
|
def set_target_value(self, val, relative=False, wait=True):
|
|
"""
|
|
moves Attocube drive to position (emulating pyepics Motor class)
|
|
|
|
arguments:
|
|
==========
|
|
val value to move to (float) [Must be provided]
|
|
relative move relative to current position (T/F) [F]
|
|
wait whether to wait for move to complete (T/F) [F]
|
|
|
|
return values:
|
|
==============
|
|
-13 : invalid value (cannot convert to float). Move not attempted.
|
|
-12 : target value outside soft limits. Move not attempted.
|
|
-11 : drive PV is not connected. Move not attempted.
|
|
-8 : move started, but timed-out.
|
|
# -7 : move started, timed-out, but appears done.
|
|
-5 : move started, unexpected return value from PV.put().
|
|
# -4 : move-with-wait finished, soft limit violation seen.
|
|
# -3 : move-with-wait finished, hard limit violation seen.
|
|
0 : move-with-wait finished OK.
|
|
0 : move-without-wait executed, move not confirmed.
|
|
1 : move-without-wait executed, move confirmed.
|
|
# 3 : move-without-wait finished, hard limit violation seen.
|
|
# 4 : move-without-wait finished, soft limit violation seen.
|
|
|
|
"""
|
|
INVALID_VALUE = -13
|
|
OUTSIDE_LIMITS = -12
|
|
NOT_CONNECTED = -11
|
|
TIMEOUT = -8
|
|
UNKNOWN_ERROR = -5
|
|
SUCCESS = 0
|
|
EXECUTED = 0
|
|
CONFIRMED = 1
|
|
|
|
PUT_SUCCESS = 1
|
|
PUT_TIMEOUT = -1
|
|
|
|
try:
|
|
val = float(val)
|
|
except Exception:
|
|
return INVALID_VALUE
|
|
|
|
if relative:
|
|
val += self.pvs.drive.get()
|
|
|
|
if not self.config.ignore_limits:
|
|
if not self.within_epics_limits(val):
|
|
return OUTSIDE_LIMITS
|
|
|
|
put_stat = self.pvs.drive.put(val, wait=wait, timeout=self.config.timeout)
|
|
|
|
if not self.is_output_on():
|
|
self.enable_output(verbose=False)
|
|
|
|
if self.config.target_gnd == True or self.config.target_gnd == 1:
|
|
self.set_ground_on_target(True, verbose=False)
|
|
elif self.config.target_gnd == False or self.config.target_gnd == 0:
|
|
self.set_ground_on_target(False, verbose=False)
|
|
|
|
self.allow_move()
|
|
|
|
if put_stat is None:
|
|
return NOT_CONNECTED
|
|
|
|
if wait and put_stat == PUT_TIMEOUT:
|
|
return TIMEOUT
|
|
|
|
if put_stat != PUT_SUCCESS:
|
|
return UNKNOWN_ERROR
|
|
|
|
t0 = time.time()
|
|
tstart = t0 + min(self.config.timeout, 10)
|
|
tout = t0 + self.config.timeout
|
|
|
|
if not wait and not self.config.confirm_move:
|
|
return EXECUTED
|
|
|
|
if not wait:
|
|
return CONFIRMED
|
|
|
|
if time.time() > tout:
|
|
return TIMEOUT
|
|
|
|
# Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
|
|
time.sleep(self.config.pos_check_delay)
|
|
|
|
while self.is_at_target() == False and time.time() <= tout:
|
|
# If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position.
|
|
if self.is_at_target(tolerance=2):
|
|
for attempt in range(self.config.move_attempts):
|
|
if self.config.verbose_move:
|
|
print(f"move attempt: {attempt+1}")
|
|
self.pvs.move.put(1, wait=True)
|
|
time.sleep(self.config.move_attempt_sleep)
|
|
if self.is_at_target():
|
|
if self.config.verbose_move:
|
|
print(f"Move finished. Had to poke the cube {attempt+1}x to get there though.")
|
|
return SUCCESS
|
|
|
|
print("Position reached within 2um, but not better.")
|
|
return TIMEOUT
|
|
|
|
if self.is_at_target():
|
|
return SUCCESS
|
|
|
|
return UNKNOWN_ERROR
|
|
|
|
def move(self, val, relative=False, wait=True):
|
|
"""'Same as set target value, just for convenience"""
|
|
return self.set_target_value(val, relative=relative, wait=wait)
|
|
|
|
def gui(self):
|
|
device, motor = self.ID.split(":")
|
|
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
|
|
return subprocess.Popen(cmd, shell=True)
|
|
|
|
|
|
class AttocubeError(AdjustableError):
|
|
pass
|