large reorganization, part 1 done

This commit is contained in:
2024-05-27 16:09:46 +02:00
parent ffbfa2ba92
commit 90e12fc814
42 changed files with 1371 additions and 43856 deletions

View File

@ -1,39 +0,0 @@
from slic.core.adjustable import PVAdjustable
from slic.utils import typename
class AlignmentLaser:
"""Class for the alignment laser"""
def __init__(self, ID, name="Cristallina alignment laser"):
self.ID = ID
self.name = name or ID
pvname_setvalue = ID
pvname_readback = ID
self._adj = PVAdjustable(pvname_setvalue, pvname_readback, accuracy=0, internal=True)
def set_in(self):
self._adj.set_target_value(-19).wait()
def set_out(self):
self._adj.set_target_value(-1).wait()
@property
def status(self):
state = self._adj.get_current_value()
if state is None:
return "not connected"
elif state < -18:
return "In"
elif state > -6:
return "Out"
else:
return "in an undefined position"
# TODO: Save the status downstream of the beamline when laser is out before bringing it back in
def get_downstream_status(self):
return
def __repr__(self):
tn = typename(self)
return f'{tn} "{self.name}" is {self.status}'

View File

@ -1,383 +0,0 @@
from distutils.command.install_egg_info import to_filename
from re import S
from shutil import move
from tarfile import is_tarfile
import time
import subprocess
from turtle import pos
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
class AttocubeStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = AttocubeAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f'{tname} "{name}"'
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
# Commments after talking to Sven:
# 1. give values in init and then fill self.config
# 2. set_target_value() is the method for the scanner. Therefore, update it so that it has move() inside it.
# 3. is_moving() is the other part for the scanner, it will need updating as well
# 4. after that scanning should work properly
# 5. For hole drilling something like this would make sense:
# t = daq.aquire(...)
# for pos in []:
# mot.set_target_value(pos).wait()
# sleep()
# t.stop()
# where .acqure() has a the event_map - basically for every pulse id it shows whether the event was on or off. this way we can search for pulse ids for every drilling shot if we wanted to.
# event map channel is probably somewhere on the list when typing: " ioc records .*Evt.* "
class AttocubeAxis(Adjustable):
def __init__(
self,
ID,
name=None,
units=None,
internal=False,
tolerance=0.3,
timeout=300.0,
pos_check_delay=0.15,
target_gnd=None,
move_attempts=5,
move_attempt_sleep=4,
verbose_move=False,
ignore_limits=True,
confirm_move=True,
):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self._move_requested = False
self.timeout = timeout
self.config = SimpleNamespace(
tolerance=tolerance,
timeout=timeout,
pos_check_delay=pos_check_delay,
target_gnd=target_gnd,
move_attempts=move_attempts,
move_attempt_sleep=move_attempt_sleep,
verbose_move=verbose_move,
ignore_limits=ignore_limits,
confirm_move=confirm_move,
)
self.pvs = SimpleNamespace(
drive=PV(ID + "-SET_TARGET"),
readback=PV(ID + "-POS"),
amplitude=PV(ID + "-AMPL_SET"),
amplitude_rb=PV(ID + "-AMPL_RB"),
frequency=PV(ID + "-FREQ_SET"),
frequency_rb=PV(ID + "-FREQ_RB"),
moving_status=PV(ID + "-MOVING"),
hlm=PV(ID + ":HLM"), # not implemented yet, but hopefully will be added later
llm=PV(ID + ":LLM"), # not implemented yet, but hopefully will be added later
stop=PV(ID + "-STOP_AUTO_CMD"),
move=PV(ID + "-MV_ABS_SET"),
stop_auto_cmd=PV(ID + "-STOP_AUTO_CMD"),
target_tol_rb=PV(ID + "-TARGET_RANGE"),
target_tol_set=PV(ID + "-SET_TARGET_RANGE"),
target_reached=PV(ID + "-TARGET_REACHED_RB"),
target_ground=PV(ID + "-TARGET_GND_SET"),
output_enabled=PV(ID + "-ENABLE_SET"),
units=PV(ID + "-UNIT"),
output_RB=PV(ID + "-ENABLE_RB"),
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def disable_output(self, verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(0)
if verbose:
if old_value == 1:
print("Output has been disabled")
else:
print("Output has been off and stays that way")
def enable_output(self, verbose=True):
old_value = self.pvs.output_RB.value
self.pvs.output_enabled.put(1)
if verbose:
if old_value == 0:
print("Output has been enabled")
else:
print("Output has been on and stays that way")
def get_ground_on_targert(self):
return self.pvs.target_ground.value
def get_amplitude(self, verbose=False):
amplitude = self.pvs.amplitude_rb.value
if verbose:
print(f"Drive amplitude is {amplitude/1000} V")
return amplitude / 1000
def set_amplitude(self, value, verbose=False, check=False):
"""Set drive amplitude for the axis in Volts. Add check=True for checking if it happened (takes 2s)."""
self.pvs.amplitude.put(value * 1000)
if check:
time.sleep(2)
assert self.pvs.amplitude_rb.value == value * 1000, "drive amplitude readback does not match set value"
if verbose:
print(f"Drive amplitude is set to {value} V")
def get_frequency(self, verbose=False):
frequency = self.pvs.frequency_rb.value
if verbose:
print(f"Drive frequency is {frequency} V")
return frequency
def set_frequency(self, value, verbose=False, check=False):
"""Set drive frequency for the axis in Hz. Add check=True for checking if it happened (takes 2s)."""
self.pvs.frequency.put(value)
if check:
time.sleep(2)
assert self.pvs.frequency_rb.value == value, "drive frequency readback does not match set value"
if verbose:
print(f"Drive frequency is set to {value} Hz")
def set_ground_on_target(self, value, verbose=True):
if value == True or value == 1:
self.pvs.target_ground.put(1)
if verbose:
print("grounding upon target arrival is now active")
elif value == False or value == 0:
self.pvs.target_ground.put(0)
if verbose:
print("grounding upon target arrival has been deactivated")
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def get_target_tolerance(self):
return self.pvs.target_tol_rb.value
def set_target_tolerance(self, value):
self.pvs.target_tol_set.put(value)
print(f"target tolerance set to {value} {self.pvs.units.char_value}")
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_output_on(self):
return self.pvs.output_RB.value == 1
def is_moving(self):
return self.is_at_target()
def is_at_target(self, from_pv=False, tolerance=None):
"""Check whether target was reached.
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
Default is False, because otherwise tolerance can only be set and read out as an integer."""
if tolerance == None:
tolerance = self.config.tolerance
if from_pv == True:
return self.pvs.target_reached.value == 1
else:
return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance
def allow_move(self):
self.pvs.move.put(1, wait=True)
def forbid_move(self):
self.pvs.move.put(0, wait=True)
def set_move_allowed(self, value):
if value == True or value == 1:
self.pvs.move.put(1)
elif value == False or value == 0:
self.pvs.move.put(0)
else:
raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}")
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
###
# Things to be desired from the epics module:
# 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered
# 2. Soft epics limits
def set_target_value(self, val, relative=False, wait=True):
"""
moves Attocube drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not self.config.ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=self.config.timeout)
if not self.is_output_on():
self.enable_output(verbose=False)
if self.config.target_gnd == True or self.config.target_gnd == 1:
self.set_ground_on_target(True, verbose=False)
elif self.config.target_gnd == False or self.config.target_gnd == 0:
self.set_ground_on_target(False, verbose=False)
self.allow_move()
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
t0 = time.time()
tstart = t0 + min(self.config.timeout, 10)
tout = t0 + self.config.timeout
if not wait and not self.config.confirm_move:
return EXECUTED
if not wait:
return CONFIRMED
if time.time() > tout:
return TIMEOUT
# Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value.
time.sleep(self.config.pos_check_delay)
while self.is_at_target() == False and time.time() <= tout:
# If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position.
if self.is_at_target(tolerance=2):
for attempt in range(self.config.move_attempts):
if self.config.verbose_move:
print(f"move attempt: {attempt+1}")
self.pvs.move.put(1, wait=True)
time.sleep(self.config.move_attempt_sleep)
if self.is_at_target():
if self.config.verbose_move:
print(f"Move finished. Had to poke the cube {attempt+1}x to get there though.")
return SUCCESS
print("Position reached within 2um, but not better.")
return TIMEOUT
if self.is_at_target():
return SUCCESS
return UNKNOWN_ERROR
def move(self, val, relative=False, wait=True):
"""'Same as set target value, just for convenience"""
return self.set_target_value(val, relative=relative, wait=wait)
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui'
return subprocess.Popen(cmd, shell=True)
class AttocubeError(AdjustableError):
pass

View File

@ -1,37 +0,0 @@
from devices.attocube import AttocubeStage
attocube = AttocubeStage(
"SARES30-ATTOCUBE",
Y="SARES30-ATTOCUBE:A2",
X="SARES30-ATTOCUBE:A1",
)
# def make_AttocubeStage(name):
# return AttocubeStage(name,
# Z=name+':A0',
# X=name+':A1',
# )
# class NamedAttocubeStage(AttocubeStage):
# def __init__(self, name):
# super().__init__(name,
# Z=name+':A0',
# X=name+':A1',
# )
# def set_target_value(self, value):
# t1 = self.stage1.set_target_value(value)
# t2 = self.stage2.set_target_value(-value)
# t1.wait()
# t2.wait()
# def is_moving(self):
# return any(self.stage1.is_moving(), self.stage2.is_moving())
# t = daq.aquire(...)
# for pos in []:
# mot.set_target_value(pos).wait()
# sleep()
# t.stop()

View File

@ -1,14 +0,0 @@
from slic.core.adjustable import Adjustable
class MyNewCoolThing(Adjustable):
pos = 0
def get_current_value(self):
return self.pos
def set_target_value(self, value):
self.pos = value
def is_moving(self):
return False # OK OK, this is probably cheating ;)

View File

@ -1,50 +0,0 @@
""" Diffractometer
motorized axis:
SARES30-CPCL-ECMC02:ROT2THETA
SARES30-CPCL-ECMC02:ROTTHETA
SARES30-CPCL-ECMC02:TRXBASE
SARES30-CPCL-ECMC02:TRYBASE
"""
from slic.core.adjustable import Adjustable, Linked
from slic.core.device import Device, SimpleDevice
from slic.devices.general.motor import Motor
class Diffractometer(Device):
def __init__(self, ID, motor_naming="MOTOR", **kwargs):
super().__init__(ID, **kwargs)
self.twotheta = Motor("SARES30-CPCL-ECMC02:ROT2THETA") # , ID=None, name=None, units=None, internal=False):
self.theta = Motor("SARES30-CPCL-ECMC02:ROTTHETA") # , ID=None, name=None, units=None, internal=False):
self.trx_base = Motor("SARES30-CPCL-ECMC02:TRXBASE") # , ID=None, name=None, units=None, internal=False):
self.try_base = Motor("SARES30-CPCL-ECMC02:TRYBASE") # , ID=None, name=None, units=None, internal=False):
self.tr_x = Motor("SARES30-CPCL-ECMC02:TRX")
self.tr_y = Motor("SARES30-CPCL-ECMC02:TRY")
self.tr_z = Motor("SARES30-CPCL-ECMC02:TRZ")
self.td = Motor("SARES30-CPCL-ECMC02:TD")
# Set speed:
# diffractometer.theta._motor.VELO = 0.25
class ThetasCombined(Linked):
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)
def connect_axis(self):
"""
calculate offset to match scale factor
"""
offset = self.secondary.get_current_value() - self.primary.get_current_value() * self.scale_factor
self.offset = offset

View File

@ -1,125 +0,0 @@
""" DilSc prototype
"""
from slic.core.adjustable import Adjustable
from slic.core.device import Device, SimpleDevice
from frappy.client import SecopClient
from frappy import states
from frappy.datatypes import StatusType
class Dilution(Device):
def __init__(self, **kwargs):
self.name = 'DilSc'
ID = self.name
super().__init__(ID, **kwargs)
self.address = 'dilsc.psi.ch:5000'
self.dilsc = SecopClient(self.address)
self.dilsc.connect()
self.x = MagnetCoil("X", self.dilsc, 'x', limit_low=-0.6, limit_high=0.6)
self.y = MagnetCoil("Y", self.dilsc, 'y', limit_low=-0.6, limit_high=0.6)
self.z = MagnetCoil("Z", self.dilsc, 'z', limit_low=-5.2, limit_high=5.2)
self.T_plato = Thermometer('T_plato', self.dilsc, limit_low=0, limit_high=300)
self.T_pucksensor = Thermometer('T_pucksensor', self.dilsc, limit_low=0, limit_high=300)
class Thermometer(Adjustable):
def __init__(self, name, dilsc_connection, limit_low=-0.0001, limit_high=0.0001):
super().__init__(name, limit_low=limit_low, limit_high=limit_high)
self.dilsc = dilsc_connection
def _check_connection(func):
def checker(self, *args, **kwargs):
if not self.dilsc.online:
raise ConnectionError(f'No connection to dilsc at {self.address}')
else:
return func(self, *args, **kwargs)
return checker
@_check_connection
def get_current_value(self):
cacheitem = self.dilsc.getParameter(f'{self.name}', 'value', trycache=False)
return cacheitem.value
@_check_connection
def set_target_value(self, value):
self.dilsc.setParameter(f'{self.name}', 'target', value)
@_check_connection
def get_target_value(self):
cacheitem = self.dilsc.getParameter(f'{self.name}', 'target', trycache=False)
return cacheitem.value
@_check_connection
def is_moving(self):
response = self.dilsc.getParameter(f'{self.name}','status', trycache=False)
return response[0][0] > StatusType.PREPARED
@_check_connection
def get_PID_parameters(self):
""" Returns the current PID parameters associated with the control loop for
this thermometer.
"""
response = self.dilsc.getParameter(f'{self.name}','ctrlpars', trycache=False)
return response
@_check_connection
def set_PID_parameters(self, p, i, d):
""" Sets the PID parameters for the associated control loop.
TODO:
- This still returns a timeout error but sets the correct values.
- The range is limited to less than the Lakeshore range allows, this needs
to be fixed in frappy.
"""
self.dilsc.setParameter(f'{self.name}', 'ctrlpars', {'p': p, 'i': i, 'd': d})
class MagnetCoil(Adjustable):
def __init__(self, name, dilsc_connection, direction, limit_low=-0.0001, limit_high=0.0001):
super().__init__(name, limit_low=-0.0001, limit_high=0.0001)
self.direction = direction.lower()
if self.direction not in ["x", "y", "z"]:
raise ValueError("Direction must be either x, y or z.")
self.dilsc = dilsc_connection
def _check_connection(func):
def checker(self, *args, **kwargs):
if not self.dilsc.online:
raise ConnectionError(f'No connection to dilsc at {self.address}')
else:
return func(self, *args, **kwargs)
return checker
@_check_connection
def get_current_value(self):
cacheitem = self.dilsc.getParameter(f'mf{self.direction}', 'value', trycache=False)
return cacheitem.value
@_check_connection
def set_target_value(self, value):
self.dilsc.setParameter(f'mf{self.direction}', 'target', value)
@_check_connection
def is_moving(self):
response = self.dilsc.getParameter(f'mf{self.direction}','status', trycache=False)
return response[0][0] > StatusType.PREPARED

View File

@ -1,18 +0,0 @@
from slic.core.device import Device, SimpleDevice
from slic.devices.general.motor import Motor
class SlitUnitCenterWidthJJ(Device):
def __init__(self, ID, motor_naming="MOTOR", **kwargs):
super().__init__(ID, **kwargs)
self.x = SimpleDevice(
ID + "-X", center=Motor(ID + ":" + motor_naming + "_X"), width=Motor(ID + ":" + motor_naming + "_W")
)
self.y = SimpleDevice(
ID + "-Y", center=Motor(ID + ":" + motor_naming + "_Y"), width=Motor(ID + ":" + motor_naming + "_H")
)
jjslits = SlitUnitCenterWidthJJ("SARES30-MOBI2", motor_naming="MOT")

View File

@ -1,6 +0,0 @@
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
newport_z = Motor("SARES30-MOBI1:MOT_5")
newport = SimpleDevice("Newport linear stage base motors", z=newport_z)

View File

@ -1,30 +0,0 @@
from slic.core.adjustable import PVAdjustable
from slic.utils import typename
class PP_Shutter:
"""Class for when pulse picker is used as a shutter. Opening and closing is done with turning on and off the event reciever universal output ON and OFF."""
def __init__(self, ID, name="X-ray Pulse Picker Shutter"):
self.ID = ID
self.name = name or ID
pvname_setvalue = ID
pvname_readback = ID
self._adj = PVAdjustable(pvname_setvalue, pvname_readback, accuracy=0, internal=True)
def close(self):
self._adj.set_target_value(0).wait()
def open(self):
self._adj.set_target_value(1).wait()
@property
def status(self):
state = self._adj.get_current_value()
if state is None:
return "not connected"
return "open" if state else "closed"
def __repr__(self):
tn = typename(self)
return f'{tn} "{self.name}" is {self.status}'

View File

@ -1,35 +0,0 @@
from epics import PV
def pp_normal(opening = True):
""" Set pulse picker to fixed open or closed mode.
"""
PV("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP").put("Disabled")
PV("SARES30-LTIM01-EVR0:RearUniv0_SOURCE").put("HIGH")
PV("SARES30-LTIM01-EVR0:RearUniv0_SOURCE2").put("HIGH")
if opening:
PV("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP").put("Enabled")
else:
PV("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP").put("Disabled")
def pp_cta():
""" Set pulse picker to use the CTA as control input.
(check connections and parameters!)
"""
PV("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP").put("Disabled")
PV("SARES30-LTIM01-EVR0:RearUniv0_SOURCE").put("PULSER")
PV("SARES30-LTIM01-EVR0:RearUniv0_SOURCE2").put("PULSER")
PV("SARES30-LTIM01-EVR0:RearUniv0_SNUMPD").put("3")
PV("SARES30-LTIM01-EVR0:RearUniv0_SNUMPD2").put("3")
PV("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP").put("Enabled")

View File

@ -1,288 +0,0 @@
import time
import subprocess
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
class Status(IntEnum):
STOPPED = 0
STEPPING = 1
SCANNING = 2
HOLDING = 3
TARGETING = 4
MOVE_DELAY = 5
CALIBRATING = 6
FINDING_REF = 7
LOCKED = 8
class SmarActStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = SmarActAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f'{tname} "{name}"'
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
class SmarActAxis(Adjustable):
def __init__(self, ID, name=None, units=None, internal=False, tolerance=0.5):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self.timeout = 60
self._move_requested = False
self.config = SimpleNamespace(
tolerance=tolerance,
)
self.pvs = SimpleNamespace(
drive=PV(ID + ":DRIVE"),
readback=PV(ID + ":MOTRBV"),
hlm=PV(ID + ":HLM"),
llm=PV(ID + ":LLM"),
status=PV(ID + ":STATUS"),
set_pos=PV(ID + ":SET_POS"),
stop=PV(ID + ":STOP.PROC"),
hold=PV(ID + ":HOLD"),
twv=PV(ID + ":TWV"),
units=PV(ID + ":DRIVE.EGU"),
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def reset_current_value_to(self, value):
return self.pvs.set_pos.put(value)
def set_target_value(self, value):
print(f"moving to {value}")
wait_time = self.wait_time
timeout = self.timeout + time.time()
self._move_requested = True
self.pvs.drive.put(value, wait=True)
# wait for start
while self._move_requested and not self.is_moving():
time.sleep(wait_time)
if time.time() >= timeout:
tname = typename(self)
self.stop()
raise SmarActError(f'starting to move {tname} "{self.name}" to {value} {self.units} timed out')
# wait for move done
while self._move_requested and self.is_moving():
if self.is_holding(): # holding == arrived at target!
break
time.sleep(wait_time)
self._move_requested = False
print("move done")
def is_at_target(self, from_pv=False, tolerance=None):
"""Check whether target was reached.
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
Default is False, because otherwise tolerance can only be set and read out as an integer."""
if tolerance == None:
tolerance = self.config.tolerance
if from_pv == True:
return self.pvs.target_reached.value == 1
else:
return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_moving(self):
return self.status != Status.STOPPED
def is_holding(self):
return self.status == Status.HOLDING
@property
def status(self):
return self.pvs.status.get()
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0):
"""
moves SmarAct drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
ignore_limits try move without regard to limits (T/F) [F]
confirm_move try to confirm that move has begun (T/F) [F]
timeout max time for move to complete (in seconds) [300]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout)
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
stat = self.status
t0 = time.time()
thold = t0 + self.pvs.hold.get() * 0.001
tstart = t0 + min(timeout, 10)
tout = t0 + timeout
if not wait and not confirm_move:
return EXECUTED
while stat == Status.HOLDING and time.time() <= thold:
ca.poll(evt=1.0e-2)
stat = self.status
while stat == Status.STOPPED and time.time() <= tstart:
ca.poll(evt=1.0e-2)
stat = self.status
if stat != Status.TARGETING:
if time.time() > tout:
return TIMEOUT
else:
return UNKNOWN_ERROR
if not wait:
return CONFIRMED
while stat == Status.TARGETING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
if stat not in (Status.HOLDING, Status.TARGETING):
return UNKNOWN_ERROR
if time.time() > tout:
return TIMEOUT
twv = self.pvs.twv.get()
twv = abs(twv)
ca.poll(evt=1.0e-1)
while stat == Status.HOLDING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
delta = self.pvs.readback.get() - val
delta = abs(delta)
if delta < twv:
return SUCCESS
return UNKNOWN_ERROR
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "P={device}:,M={motor}" SARES30-CPCL-SMARACTX1_6axis_old.ui'
return subprocess.Popen(cmd, shell=True)
class SmarActError(AdjustableError):
pass

View File

@ -1,32 +0,0 @@
# from slic.devices.general.smaract import SmarActStage
from devices.smaract import SmarActStage
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
# this currently uses a modified SmarActStage module
# otherwise the wait times are not working correctly.
smaract_z = Motor("SARES30-MCS2750:MOT_1")
smaract_x = Motor("SARES30-MCS2750:MOT_2")
smaract_y = Motor("SARES30-MCS2750:MOT_3")
smaract_mini_XYZ = SimpleDevice("XYZ smaract stage (mini)", x=smaract_x, y=smaract_y, z=smaract_z)
smaract_Juraj = SmarActStage(
"SARES30-XSMA156",
X="SARES30-XSMA156:X",
Y="SARES30-XSMA156:Y",
Z="SARES30-XSMA156:Z",
Ry="SARES30-XSMA156:Ry",
Rx="SARES30-XSMA156:Rx",
Rz="SARES30-XSMA156:Rz",
)
# smaract_mini_XYZ = SmarActStage("SARES30-MCS2750",
# X='SARES30-MCS2750:MOT_1',
# Y='SARES30-MCS2750:MOT_2',
# Z='SARES30-MCS2750:MOT_3',
# )

View File

@ -1,7 +0,0 @@
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
standa_z = Motor("SARES30-MOBI1:MOT_1")
standa_x = Motor("SARES30-MOBI1:MOT_2")
standa = SimpleDevice("Standa X-ray eye base motors", x=standa_x, z=standa_z)