Files
x11ma/script/devices/LEEM2000.py
gac-x11ma 8aefd82ba7
2024-03-19 16:16:03 +01:00

438 lines
15 KiB
Python

import ch.psi.pshell.serial.TcpDevice as TcpDevice
class LEEM2000(TcpDevice):
def __init__(self, name,host="pc14062.psi.ch", port=5566):
TcpDevice.__init__(self, name, host, port)
self.NUMBER_MNEMONICS = 100
self._mnemonics = None
self._names = None
try:
self.high_voltage = Channel("X11MA-ES1-PEEM:UMON", alias = "PEEM high voltage", monitored=True)
except:
self.high_voltage = None
self.debug = False
self.retries = 1
self.timeout = 1000
self.move_timeout = 600000 #5min
self.homing_timeout = 600000 #5min
def doInitialize(self):
super(LEEM2000, self).doInitialize()
def doUpdate(self):
try:
hv = self.get_high_voltage()
except:
hv = ""
try:
pl = self.get_preset_label()
except:
pl = "unknown" if self.client.isConnected() else "offline"
value = "%1.3f | %s" % (hv,pl) if hv!="" or pl!="" else None
self.setCache(value, None)
def send_receive(self, tx, timeout = None, retries = None):
if retries is None:
retries = self.retries
if timeout is None:
timeout = self.timeout
trailer = chr(0)
#print "TX: ", tx,
ret = self.sendReceive(tx+trailer, None,trailer, 0, timeout, retries)
#print " RX: ", ret
return str(ret[0:-1]).strip()
def get_value(self, name, timeout = None, retries = None):
cmd = "get " + name
ret = self.send_receive(cmd, timeout, retries)
try:
value = float(ret)
return value
except:
if self.debug:
self.getLogger().info("Received invalid value for %s: %s" % (name, ret))
return None
def set_value(self, name, value, timeout = None, retries = None):
cmd = "set " + name + "=" + str(value)
ret = self.send_receive(cmd, timeout, retries)
if ret != '0':
raise Exception("Error writing %s to %s: %s" % (str(value), name, ret))
def get_mnemonic(self, index, timeout = None, retries = None):
cmd = "mne " + str(index)
return self.send_receive(cmd, timeout, retries)
def _get_mnemonics(self, timeout = None, retries = None):
ret = []
for i in range(self.NUMBER_MNEMONICS):
mne = self.get_mnemonic(i, timeout, retries)
if mne not in [None, '', 'disabled', 'invalid']:
ret.append(mne)
return ret
def get_mnemonics(self, timeout = None, retries = None):
if self._mnemonics is None:
try:
self._mnemonics = self._get_mnemonics(timeout, retries)
except:
self.getLogger().warning("Error retrieving microscope mnemonics")
return []
return self._mnemonics
def get_mnemonic_values(self, timeout = None, retries = None):
ret = {}
for mnemonic in self.get_mnemonics(timeout, retries):
ret[mnemonic] = self.get_value(mnemonic, timeout, retries)
return ret
def get_name(self, index, timeout = None, retries = None):
cmd = "nam " + str(index)
return self.send_receive(cmd, timeout, retries)
def _get_names(self, timeout = None, retries = None):
ret = []
for i in range(self.NUMBER_MNEMONICS):
mne = self.get_name(i, timeout, retries)
if mne not in [None, '', 'disabled', 'invalid', 'Manip. X', 'Manip. Y']:
ret.append(mne)
return ret
def get_names(self, timeout = None, retries = None):
if self._names is None:
try:
self._names = self._get_names(timeout = timeout, retries = retries)
except:
self.getLogger().warning("Error retrieving microscope mnemonics")
return []
return self._names
def get_values(self, timeout=None, retries=None):
ret = {}
for name in self.get_names():
ret[name] = self.get_value(name, timeout, retries)
return ret
def get_preset_label(self, timeout=None, retries=None):
cmd = "prl"
return self.send_receive(cmd, timeout, retries)
def get_high_voltage(self):
return self.high_voltage.get(False)
def get_child(self, var, name=None, scale=None):
if name is None:
name=var
ret = LEEM2000Child(name, var, self, scale)
ret.initialize()
return ret
def get_positioner(self, var, name=None):
if name is None:
name=var
ret = LEEM2000Positioner(name, var, self)
ret.initialize()
return ret
def get_manip_motor(self, motor_id, encoder_index, name=None):
if name is None:
name=str(motor_id)
ret = LEEM2000ManipMotor(name, motor_id, encoder_index, self)
ret.initialize()
return ret
def get_slit_motor(self, motor_id, encoder_index, name=None):
if name is None:
name=str(motor_id)
ret = LEEM2000SlitMotor(name, motor_id, encoder_index, self)
ret.initialize()
return ret
def get_tilt(self, motor_id_pos, motor_id_neg, name=None):
if name is None:
name=str(motor_id_pos) + "_" + str(motor_id_neg)
ret = LEEM2000Tilt(name, motor_id_pos, motor_id_neg, self)
ret.initialize()
return ret
def get_motor(self, motor_id, name=None):
if name is None:
name=str(motor_id)
ret = LEEM2000Motor(name, motor_id, self)
ret.initialize()
return ret
def get_manip_readback(self, timeout = None, retries = None):
ret = microscope.send_receive("gmv", timeout, retries)
tokens = ret.split(",")
return float(tokens[0]), float(tokens[1])
#motor_id = U, D, L or R
def move_tilt(self, motor_id, dist_us, timeout = None, retries = None):
if timeout is None:
timeout = self.move_timeout
cmd = "tlt " + str(motor_id) + " " + str(dist_us)
ret = self.send_receive(cmd, timeout, retries)
if ret != '0':
raise Exception("Error moving tilt %s to %s: %s" % (str(cmd), dist_us, ret))
#Direction H or V
def home_tilt(self, direction, timeout = None, retries = None):
if timeout is None:
timeout = self.homing_timeout
cmd = "tlt " + str(direction)
ret = self.send_receive(cmd, timeout, retries)
if ret != '0':
raise Exception("Error homing tilt %s: %s" % (str(cmd), ret))
#motor_id = 11 (X) or 10(Y)
def move_motor_rel(self, motor_id, dist_us, timeout = None, retries = None):
if timeout is None:
timeout = self.move_timeout
cmd = "mmd " + str(motor_id) + " " + str(dist_us)
ret = self.send_receive(cmd, timeout, retries)
if ret != '0':
raise Exception("Error relative moving motor %s to %s: %s" % (str(motor_id), dist_us, ret))
def stop_motor(self, motor_id, timeout = None, retries = None):
self.move_motor_rel( motor_id, 0.0, timeout = None, retries = None)
def is_motor_moving(self, motor_id, timeout = None, retries = None):
cmd = "mmd " + str(motor_id) + " 888888.0"
if ret == '0':
return False
if ret == '1':
return True
raise Exception("Error reading motor %s state: %s" % (str(motor_id), ret))
#motor_id = 11 (X) or 10(Y)
def move_motor(self, motor_id, pos_us, timeout = None, retries = None):
if timeout is None:
timeout = self.move_timeout
cmd = "mmp " + str(motor_id) + " " + str(pos_us)
ret = self.send_receive(cmd, timeout, retries)
if ret != '0':
raise Exception("Error moving motor %s to %s: %s" % (str(motor_id), pos_us, ret))
class LEEM2000Child(RegisterBase):
def __init__(self,name, var, microscope, scale=None):
RegisterBase.__init__(self,name)
self.var = var
self.scale = scale
self.microscope = microscope
def doRead(self):
ret = self.microscope.get_value(self.var)
if ret is not None:
if self.scale is not None:
ret = float(ret)*self.scale
return ret
def doWrite(self, val):
if val is not None:
if self.scale is not None:
val = float(val)/self.scale
self.microscope.set_value(self.var, val)
class LEEM2000Positioner (PositionerBase):
def __init__(self, name, var, microscope):
PositionerBase.__init__(self, name, PositionerConfig())
self.var = var
self.microscope = microscope
self.setpoint = None
def doRead(self):
if self.setpoint is None:
self.setpoint = self.doReadReadback()
return self.setpoint
def doWrite(self, val):
self.microscope.set_value(self.var, val)
self.setpoint = val
def doReadReadback(self):
return self.microscope.get_value(self.var)
class LEEM2000ManipMotor(PositionerBase):
def __init__(self,name, motor_id, encoder_index, microscope):
PositionerBase.__init__(self,name, PositionerConfig())
self.motor_id = motor_id
self.encoder_index = encoder_index
self.microscope = microscope
self.pos = None
def doRead(self):
if self.pos is None:
self.pos = self.doReadReadback()
return self.pos
def doWrite(self, val):
self.microscope.move_motor(self.motor_id, val)
self.pos = None
def doReadReadback(self):
pos_mm= self.microscope.get_manip_readback()[self.encoder_index]
return pos_mm*1000.0
class LEEM2000SlitMotor(PositionerBase):
def __init__(self, name, motor_id, encoder_index, microscope):
PositionerBase.__init__(self, name, PositionerConfig())
self.motor_id = motor_id
self.encoder_index = encoder_index
self.microscope = microscope
self.pos = None
def doRead(self):
if self.pos is None:
self.pos = float(self.doReadReadback())
return self.pos
def doWrite(self, val):
self.microscope.move_motor(self.motor_id, val)
self.pos = None
def doReadReadback(self):
pos_step = self.microscope.get_value(str(self.encoder_index))
#pos_mm= self.microscope.get_manip_readback()[self.encoder_index]
return pos_step*0.700
class LEEM2000Motor(PositionerBase):
def __init__(self,name, motor_id, microscope):
PositionerBase.__init__(self,name, PositionerConfig())
self.motor_id = motor_id
self.microscope = microscope
self.pos = None
def doRead(self):
#self.microscope.get_value(str(self.motor_id))
return self.pos
def doWrite(self, val):
self.microscope.move_motor(self.motor_id, val)
self.pos = val
class LEEM2000Tilt(RegisterBase):
def __init__(self,name, motor_id_pos, motor_id_neg, microscope):
RegisterBase.__init__(self,name)
self.motor_id_pos = motor_id_pos
self.motor_id_neg = motor_id_neg
self.microscope = microscope
self.pos = None
def doRead(self):
return self.pos
def doWrite(self, val):
if (val>0):
print "SET TILT ", self.motor_id_pos, " -> ", val
self.microscope.move_tilt(self.motor_id_pos, val)
elif (val<0):
print "SET TILT ", self.motor_id_neg, " -> ", -val
self.microscope.move_tilt(self.motor_id_neg, -val)
self.pos = val
add_device (LEEM2000("microscope"), True)
add_device (microscope.get_manip_motor(11, 0, "manip_x"), True)
add_device (microscope.get_manip_motor(10, 1, "manip_y"), True)
add_device (microscope.get_tilt('L','R', "tilt_h"), True)
add_device (microscope.get_tilt('D','U', "tilt_v"), True)
add_device (microscope.get_child("MOBJ","objective"), True)
add_device (microscope.get_child("OSTIGA","obj_stig_a"), True)
add_device (microscope.get_child("OSTIGB","obj_stig_b"), True)
add_device (microscope.get_child("MDRIVE","azimuth_rot", 0.01794), True)
add_device (microscope.get_child("BOMBV","bv"), True)
add_device (microscope.get_child("FIL","fil"), True)
add_device (microscope.get_child("STV","start_voltage"), True)
add_device (microscope.get_child("OBJDX","obj_align_x"), True)
add_device (microscope.get_child("OBJDY","obj_align_y"), True)
add_device (microscope.get_child("HMOTSLIT","slit_motion_pos"), True)
#add_device (microscope.get_child("76","slit_motion_pos"), True)
#add_device (microscope.get_child("HMOTCAX","ca_motion_pos"), True)
#add_device (microscope.get_child("HMOTCAY","ca_correction_pos"), True)
add_device (microscope.get_motor(1, "ca_correction"), True)
add_device (microscope.get_motor(2, "ca_motion"), True)
add_device (microscope.get_slit_motor(7, 76, "slit_motion"), True)
microscope.setPolling(5000)
class TiltMotor(RegisterBase):
def __init__(self, name, tilt_motor):
RegisterBase.__init__(self, name)
self.position = 0
self.tilt_motor = tilt_motor
def doInitialize(self):
self.position = 0
def doRead(self):
return self.position
def doWrite(self, pos):
if abs(pos)>10000:
raise Exception("Exceeded device range")
offset = pos -self.position
self.tilt_motor.write(offset)
self.position = pos
class Fov(ReadonlyRegisterBase):
def __init__(self, name, mic):
ReadonlyRegisterBase.__init__(self, name)
self.mic=mic
def doInitialize(self):
self.position = 0
def doRead(self):
ret = self.mic.get_preset_label()
if ret is not None:
if ":" in ret:
ret = ret[0:ret.rindex(":")]
return ret
def init_tilt():
tilt_vertical.initialize()
tilt_horizontal.initialize()
add_device(TiltMotor("tilt_vertical", tilt_v), True)
add_device(TiltMotor("tilt_horizontal", tilt_h), True)
add_device(Fov("fov", microscope), True)
tilt_vertical.polling=500
tilt_horizontal.polling=500
manip_x.polling=500
manip_y.polling=500
fov.polling=5000
azimuth_rot.precision=1
azimuth_rot.polling=5000
bv.polling=1000
fil.polling=1000
slit_motion.polling=1000
#Create a listener to the sensor, verifying the readback values.
class ListenerAzimuth (DeviceListener):
def onCacheChanged(self, device, value, former, timestamp,value_changed):
if (device is azimuth_rot) and (value is not None):
if value_changed:
if self.debug:
print "Azimuth angle changed: " + str(value)
try:
set_azimuth_rot(value)
except:
if self.debug:
log(str(sys.exc_info()[1]))
listenerAzimuth = ListenerAzimuth()
listenerAzimuth.debug=True
azimuth_rot.addListener(listenerAzimuth)