Files
cristallina/gp_exp/attocube.py

384 lines
13 KiB
Python
Executable File

from distutils.command.install_egg_info import to_filename
from re import S
from shutil import move
from tarfile import is_tarfile
import time
import subprocess
from turtle import pos
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
class AttocubeStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = AttocubeAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f'{tname} "{name}"'
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
# Commments after talking to Sven:
# 1. give values in init and then fill self.config
# 2. set_target_value() is the method for the scanner. Therefore, update it so that it has move() inside it.
# 3. is_moving() is the other part for the scanner, it will need updating as well
# 4. after that scanning should work properly
# 5. For hole drilling something like this would make sense:
# t = daq.aquire(...)
# for pos in []:
# mot.set_target_value(pos).wait()
# sleep()
# t.stop()
# where .acqure() has a the event_map - basically for every pulse id it shows whether the event was on or off. this way we can search for pulse ids for every drilling shot if we wanted to.
# event map channel is probably somewhere on the list when typing: " ioc records .*Evt.* "
class AttocubeAxis(Adjustable):
def __init__(
self,
ID,
name=None,
units=None,
internal=False,
tolerance=0.3,
timeout=300.0,
pos_check_delay=0.15,
target_gnd=None,
move_attempts=5,
move_attempt_sleep=4,
verbose_move=False,
ignore_limits=True,
confirm_move=True,
):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self._move_requested = False
self.timeout = timeout
self.config = SimpleNamespace(
tolerance=tolerance,
timeout=timeout,
pos_check_delay=pos_check_delay,
target_gnd=target_gnd,
move_attempts=move_attempts,
move_attempt_sleep=move_attempt_sleep,
verbose_move=verbose_move,
ignore_limits=ignore_limits,
confirm_move=confirm_move,
)
self.pvs = SimpleNamespace(
drive=PV(ID + "-SET_TARGET"),
readback=PV(ID + "-POS"),
amplitude=PV(ID + "-AMPL_SET"),
amplitude_rb=PV(ID + "-AMPL_RB"),
frequency=PV(ID + "-FREQ_SET"),
frequency_rb=PV(ID + "-FREQ_RB"),
moving_status=PV(ID + "-MOVING"),
hlm=PV(ID + ":HLM"), # not implemented yet, but hopefully will be added later
llm=PV(ID + ":LLM"), # not implemented yet, but hopefully will be added later
stop=PV(ID + "-STOP_AUTO_CMD"),
move=PV(ID + "-MV_ABS_SET"),
stop_auto_cmd=PV(ID + "-STOP_AUTO_CMD"),
target_tol_rb=PV(ID + "-TARGET_RANGE"),
target_tol_set=PV(ID + "-SET_TARGET_RANGE"),
target_reached=PV(ID + "-TARGET_REACHED_RB"),
target_ground=PV(ID + "-TARGET_GND_SET"),
output_enabled=PV(ID + "-ENABLE_SET"),
units=PV(ID + "-UNIT"),
output_RB=PV(ID + "-ENABLE_RB"),
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def disable_output(self, verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(0)
if verbose:
if old_value == 1:
print("Output has been disabled")
else:
print("Output has been off and stays that way")
def enable_output(self, verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(1)
if verbose:
if old_value == 0:
print("Output has been enabled")
else:
print("Output has been on and stays that way")
def get_ground_on_targert(self):
return self.pvs.target_ground.value
def get_amplitude(self, verbose=False):
amplitude = self.pvs.amplitude_rb.value
if verbose:
print(f"Drive amplitude is {amplitude/1000} V")
return amplitude / 1000
def set_amplitude(self, value, verbose=False, check=False):
"""Set drive amplitude for the axis in Volts. Add check=True for checking if it happened (takes 2s)."""
self.pvs.amplitude.put(value * 1000)
if check:
time.sleep(2)
assert self.pvs.amplitude_rb.value == value * 1000, "drive amplitude readback does not match set value"
if verbose:
print(f"Drive amplitude is set to {value} V")
def get_frequency(self, verbose=False):
frequency = self.pvs.frequency_rb.value
if verbose:
print(f"Drive frequency is {frequency} V")
return frequency
def set_frequency(self, value, verbose=False, check=False):
"""Set drive frequency for the axis in Hz. Add check=True for checking if it happened (takes 2s)."""
self.pvs.frequency.put(value)
if check:
time.sleep(2)
assert self.pvs.frequency_rb.value == value, "drive frequency readback does not match set value"
if verbose:
print(f"Drive frequency is set to {value} Hz")
def set_ground_on_target(self, value, verbose=True):
if value == True or value == 1:
self.pvs.target_ground.put(1)
if verbose:
print("grounding upon target arrival is now active")
elif value == False or value == 0:
self.pvs.target_ground.put(0)
if verbose:
print("grounding upon target arrival has been deactivated")
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def get_target_tolerance(self):
return self.pvs.target_tol_rb.value
def set_target_tolerance(self, value):
self.pvs.target_tol_set.put(value)
print(f"target tolerance set to {value} {self.pvs.units.char_value}")
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_output_on(self):
return self.pvs.output_RB.value == 1
def is_moving(self):
return self.is_at_target()
def is_at_target(self, from_pv=False, tolerance=None):
"""Check whether target was reached.
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
Default is False, because otherwise tolerance can only be set and read out as an integer."""
if tolerance == None:
tolerance = self.config.tolerance
if from_pv == True:
return self.pvs.target_reached.value == 1
else:
return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance
def allow_move(self):
self.pvs.move.put(1, wait=True)
def forbid_move(self):
self.pvs.move.put(0, wait=True)
def set_move_allowed(self, value):
if value == True or value == 1:
self.pvs.move.put(1)
elif value == False or value == 0:
self.pvs.move.put(0)
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
###
# Things to be desired from the epics module:
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
# 2. Soft epics limits
def set_target_value(self, val, relative=False, wait=True):
"""
moves Attocube drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not self.config.ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=self.config.timeout)
if not self.is_output_on():
self.enable_output(verbose=False)
if self.config.target_gnd == True or self.config.target_gnd == 1:
self.set_ground_on_target(True, verbose=False)
elif self.config.target_gnd == False or self.config.target_gnd == 0:
self.set_ground_on_target(False, verbose=False)
self.allow_move()
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
t0 = time.time()
tstart = t0 + min(self.config.timeout, 10)
tout = t0 + self.config.timeout
if not wait and not self.config.confirm_move:
return EXECUTED
if not wait:
return CONFIRMED
if time.time() > tout:
return TIMEOUT
# Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
time.sleep(self.config.pos_check_delay)
while self.is_at_target() == False and time.time() <= tout:
# If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position.
if self.is_at_target(tolerance=2):
for attempt in range(self.config.move_attempts):
if self.config.verbose_move:
print(f"move attempt: {attempt+1}")
self.pvs.move.put(1, wait=True)
time.sleep(self.config.move_attempt_sleep)
if self.is_at_target():
if self.config.verbose_move:
print(f"Move finished. Had to poke the cube {attempt+1}x to get there though.")
return SUCCESS
print("Position reached within 2um, but not better.")
return TIMEOUT
if self.is_at_target():
return SUCCESS
return UNKNOWN_ERROR
def move(self, val, relative=False, wait=True):
"""'Same as set target value, just for convenience"""
return self.set_target_value(val, relative=relative, wait=wait)
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
return subprocess.Popen(cmd, shell=True)
class AttocubeError(AdjustableError):
pass