import ch.psi.pshell.serial.TcpDevice as TcpDevice class LEEM2000(TcpDevice): def __init__(self, name,host="pc14062.psi.ch", port=5566): TcpDevice.__init__(self, name, host, port) self.NUMBER_MNEMONICS = 100 self._mnemonics = None self._names = None self.high_voltage = Channel("X11MA-ES1-PEEM:UMON", alias = "PEEM high voltage", monitored=True) self.debug = False def doInitialize(self): super(LEEM2000, self).doInitialize() def doUpdate(self): try: hv = self.get_high_voltage() except: hv = "" try: pl = self.get_preset_label() except: pl = "unknown" if microscope.client.isConnected() else "offline" value = "%1.3f | %s" % (hv,pl) if hv!="" or pl!="" else None self.setCache(value, None) def send_receive(self, tx, timeout = 1000, retries = 1): trailer = chr(0) ret = self.sendReceive(tx+trailer, None,trailer, 0, 1000, 1) return str(ret[0:-1]).strip() def get_value(self, name, timeout = 1000, retries = 1): cmd = "get " + name ret = self.send_receive(cmd, timeout, retries) try: value = float(ret) return value except: if self.debug: self.getLogger().info("Received invalid value for %s: %s" % (name, ret)) return None def set_value(self, name, value, timeout = 1000, retries = 1): cmd = "set " + name + "=" + str(value) ret = self.send_receive(cmd, timeout, retries) if ret != '0': raise Exception("Error writing %s to %s: %s" % (str(value), name, ret)) def get_mnemonic(self, index, timeout = 1000, retries = 1): cmd = "mne " + str(index) return self.send_receive(cmd, timeout, retries) def _get_mnemonics(self, timeout = 1000, retries = 1): ret = [] for i in range(self.NUMBER_MNEMONICS): mne = self.get_mnemonic(i, timeout, retries) if mne not in [None, '', 'disabled', 'invalid']: ret.append(mne) return ret def get_mnemonics(self, timeout = 1000, retries = 1): if self._mnemonics is None: try: self._mnemonics = self._get_mnemonics(timeout = 1000, retries = 1) except: self.getLogger().warning("Error retrieving microscope mnemonics") return [] return self._mnemonics def get_mnemonic_values(self, timeout = 1000, retries = 1): ret = {} for mnemonic in self.get_mnemonics(timeout, retries): ret[mnemonic] = self.get_value(mnemonic, timeout, retries) return ret def get_name(self, index, timeout = 1000, retries = 1): cmd = "nam " + str(index) return self.send_receive(cmd, timeout, retries) def _get_names(self, timeout = 1000, retries = 1): ret = [] for i in range(self.NUMBER_MNEMONICS): mne = self.get_name(i, timeout, retries) if mne not in [None, '', 'disabled', 'invalid']: ret.append(mne) return ret def get_names(self, timeout = 1000, retries = 1): if self._names is None: try: self._names = self._get_names(timeout = 1000, retries = 1) except: self.getLogger().warning("Error retrieving microscope mnemonics") return [] return self._names def get_values(self, timeout=1000, retries=1): ret = {} for name in self.get_names(): ret[name] = self.get_value(name, timeout, retries) return ret def get_preset_label(self, timeout=1000, retries=1): cmd = "prl" return self.send_receive(cmd, timeout, retries) def get_high_voltage(self): return self.high_voltage.get(False) def get_child(self, var, name=None): if name is None: name=var ret = LEEM2000Child(name, var, self) ret.initialize() return ret class LEEM2000Child(RegisterBase): def __init__(self,name, var, microscope): RegisterBase.__init__(self,name) self.var = var self.microscope = microscope def doRead(self): return self.microscope.get_value(self.var) def doWrite(self, val): self.microscope.set_value(self.var, val) add_device (LEEM2000("microscope"), True) microscope.setPolling(5000)