result from merge with gerrit

drivers in secop_psi

Change-Id: I7fd8312b11f365b423e66b2417b9e54ec6558a11
This commit is contained in:
zolliker 2022-03-08 08:35:41 +01:00
parent bd246c5ca7
commit 9320541754
3 changed files with 257 additions and 320 deletions

View File

@ -28,15 +28,16 @@ class Ls370Sim(Communicator):
('RDGR?%d', '1.0'), ('RDGR?%d', '1.0'),
('RDGST?%d', '0'), ('RDGST?%d', '0'),
('RDGRNG?%d', '0,5,5,0,0'), ('RDGRNG?%d', '0,5,5,0,0'),
('INSET?%d', '1,3,3,0,0'), ('INSET?%d', '1,5,5,0,0'),
('FILTER?%d', '1,1,80'), ('FILTER?%d', '1,5,80'),
] ]
OTHER_COMMANDS = [ OTHER_COMMANDS = [
('*IDN?', 'LSCI,MODEL370,370184,05302003'), ('*IDN?', 'LSCI,MODEL370,370184,05302003'),
('SCAN?', '3,0'), ('SCAN?', '3,1'),
] ]
def earlyInit(self): def earlyInit(self):
super().earlyInit()
self._data = dict(self.OTHER_COMMANDS) self._data = dict(self.OTHER_COMMANDS)
for fmt, v in self.CHANNEL_COMMANDS: for fmt, v in self.CHANNEL_COMMANDS:
for chan in range(1,17): for chan in range(1,17):
@ -44,6 +45,7 @@ class Ls370Sim(Communicator):
# mkthread(self.run) # mkthread(self.run)
def communicate(self, command): def communicate(self, command):
self.comLog('> %s' % command)
# simulation part, time independent # simulation part, time independent
for channel in range(1,17): for channel in range(1,17):
_, _, _, _, excoff = self._data['RDGRNG?%d' % channel].split(',') _, _, _, _, excoff = self._data['RDGRNG?%d' % channel].split(',')
@ -68,6 +70,6 @@ class Ls370Sim(Communicator):
if qcmd in self._data: if qcmd in self._data:
self._data[qcmd] = arg self._data[qcmd] = arg
break break
#if command.startswith('R'): reply = ';'.join(reply)
# print('> %s\t< %s' % (command, reply)) self.comLog('< %s' % reply)
return ';'.join(reply) return reply

View File

@ -33,17 +33,17 @@ Polling of value and status is done commonly for all modules. For each registere
import threading import threading
import time import time
from ast import literal_eval # convert string as comma separated numbers into tuple
import secop.iohandler
from secop.datatypes import BoolType, EnumType, \ from secop.datatypes import BoolType, EnumType, \
FloatRange, IntRange, StatusType, StringType FloatRange, IntRange, StatusType, StringType
from secop.errors import HardwareError from secop.errors import HardwareError
from secop.lib import clamp from secop.lib import clamp
from secop.lib.enum import Enum from secop.lib.enum import Enum
from secop.modules import Attached, Communicator, Done, \ from secop.modules import Communicator, Done, \
Drivable, Parameter, Property, Readable Drivable, Parameter, Property, Readable
from secop.poller import Poller from secop.io import HasIO
from secop.io import HasIodev from secop.rwhandler import CommonReadHandler, CommonWriteHandler
try: try:
import secop_psi.ppmswindows as ppmshw import secop_psi.ppmswindows as ppmshw
@ -52,28 +52,11 @@ except ImportError:
import secop_psi.ppmssim as ppmshw import secop_psi.ppmssim as ppmshw
class IOHandler(secop.iohandler.IOHandler):
"""IO handler for PPMS commands
deals with typical format:
- query command: ``<command>?``
- reply: ``<value1>,<value2>, ..``
- change command: ``<command> <value1>,<value2>,...``
"""
CMDARGS = ['no'] # the channel number is needed in channel commands
CMDSEPARATOR = None # no command chaining
def __init__(self, name, querycmd, replyfmt):
changecmd = querycmd.split('?')[0] + ' '
super().__init__(name, querycmd, replyfmt, changecmd)
class Main(Communicator): class Main(Communicator):
"""ppms communicator module""" """ppms communicator module"""
pollinterval = Parameter('poll interval', FloatRange(), readonly=False, default=2) pollinterval = Parameter('poll interval', FloatRange(), readonly=False, default=2)
data = Parameter('internal', StringType(), poll=True, export=False, # export for test only data = Parameter('internal', StringType(), export=True, # export for test only
default="", readonly=True) default="", readonly=True)
class_id = Property('Quantum Design class id', StringType(), export=False) class_id = Property('Quantum Design class id', StringType(), export=False)
@ -86,8 +69,6 @@ class Main(Communicator):
_channel_to_index = dict(((channel, i) for i, channel in enumerate(_channel_names))) _channel_to_index = dict(((channel, i) for i, channel in enumerate(_channel_names)))
_status_bitpos = {'temp': 0, 'field': 4, 'chamber': 8, 'position': 12} _status_bitpos = {'temp': 0, 'field': 4, 'chamber': 8, 'position': 12}
pollerClass = Poller
def earlyInit(self): def earlyInit(self):
super().earlyInit() super().earlyInit()
self.modules = {} self.modules = {}
@ -100,11 +81,14 @@ class Main(Communicator):
def communicate(self, command): def communicate(self, command):
"""GPIB command""" """GPIB command"""
with self.lock: with self.lock:
self.log.debug('> %s' % command) self.comLog('> %s' % command)
reply = self._ppms_device.send(command) reply = self._ppms_device.send(command)
self.log.debug('< %s' % reply) self.comLog("< %s", reply)
return reply return reply
def doPoll(self):
self.read_data()
def read_data(self): def read_data(self):
mask = 1 # always get packed_status mask = 1 # always get packed_status
for channelname, channel in self.modules.items(): for channelname, channel in self.modules.items():
@ -130,23 +114,27 @@ class Main(Communicator):
return data # return data as string return data # return data as string
class PpmsMixin: class PpmsBase(HasIO, Readable):
"""common base for all ppms modules""" """common base for all ppms modules"""
value = Parameter(needscfg=False)
status = Parameter(needscfg=False)
iodev = Attached()
pollerClass = Poller
enabled = True # default, if no parameter enable is defined enabled = True # default, if no parameter enable is defined
_last_settings = None # used by several modules _last_settings = None # used by several modules
slow_pollfactor = 1 slow_pollfactor = 1
# as this pollinterval affects only the polling of settings # as this pollinterval affects only the polling of settings
# it would be confusing to export it. # it would be confusing to export it.
pollinterval = Parameter('', FloatRange(), needscfg=False, export=False) pollinterval = Parameter(export=False)
def initModule(self): def initModule(self):
super().initModule() super().initModule()
self._iodev.register(self) self.io.register(self)
def doPoll(self):
# polling is done by the main module
# and PPMS does not deliver really more fresh values when polled more often
pass
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
# update value and status # update value and status
@ -160,12 +148,18 @@ class PpmsMixin:
self.value = value self.value = value
self.status = (self.Status.IDLE, '') self.status = (self.Status.IDLE, '')
def comm_write(self, command):
"""write command and check if result is OK"""
reply = self.communicate(command)
if reply != 'OK':
raise HardwareError('bad reply %r to command %r' % (reply, command))
class Channel(PpmsMixin, HasIodev, Readable):
class Channel(PpmsBase):
"""channel base class""" """channel base class"""
value = Parameter('main value of channels', poll=False, needscfg=False) value = Parameter('main value of channels')
enabled = Parameter('is this channel used?', readonly=False, poll=False, enabled = Parameter('is this channel used?', readonly=False,
datatype=BoolType(), default=False) datatype=BoolType(), default=False)
channel = Property('channel name', channel = Property('channel name',
@ -178,22 +172,17 @@ class Channel(PpmsMixin, HasIodev, Readable):
if not self.channel: if not self.channel:
self.channel = self.name self.channel = self.name
def get_settings(self, pname):
return ''
class UserChannel(Channel): class UserChannel(Channel):
"""user channel""" """user channel"""
# pollinterval = Parameter(visibility=3)
no = Property('channel number', no = Property('channel number',
datatype=IntRange(0, 0), export=False, default=0) datatype=IntRange(0, 0), export=False, default=0)
linkenable = Property('name of linked channel for enabling', linkenable = Property('name of linked channel for enabling',
datatype=StringType(), export=False, default='') datatype=StringType(), export=False, default='')
def write_enabled(self, enabled): def write_enabled(self, enabled):
other = self._iodev.modules.get(self.linkenable, None) other = self.io.modules.get(self.linkenable, None)
if other: if other:
other.enabled = enabled other.enabled = enabled
return enabled return enabled
@ -202,201 +191,172 @@ class UserChannel(Channel):
class DriverChannel(Channel): class DriverChannel(Channel):
"""driver channel""" """driver channel"""
drvout = IOHandler('drvout', 'DRVOUT? %(no)d', '%d,%g,%g') current = Parameter('driver current', readonly=False,
current = Parameter('driver current', readonly=False, handler=drvout,
datatype=FloatRange(0., 5000., unit='uA')) datatype=FloatRange(0., 5000., unit='uA'))
powerlimit = Parameter('power limit', readonly=False, handler=drvout, powerlimit = Parameter('power limit', readonly=False,
datatype=FloatRange(0., 1000., unit='uW')) datatype=FloatRange(0., 1000., unit='uW'))
# pollinterval = Parameter(visibility=3)
def analyze_drvout(self, no, current, powerlimit): param_names = 'current', 'powerlimit'
@CommonReadHandler(param_names)
def read_params(self):
no, self.current, self.powerlimit = literal_eval(
self.communicate('DRVOUT? %d' % self.no))
if self.no != no: if self.no != no:
raise HardwareError('DRVOUT command: channel number in reply does not match') raise HardwareError('DRVOUT command: channel number in reply does not match')
return dict(current=current, powerlimit=powerlimit)
def change_drvout(self, change): @CommonWriteHandler(param_names)
change.readValues() def write_params(self, values):
return change.current, change.powerlimit """write parameters
:param values: a dict like object containing the parameters to be written
"""
self.read_params() # make sure parameters are up to date
self.comm_write('DRVOUT %(no)d,%(current)g,%(powerlimit)g' % values)
self.read_params() # read back
class BridgeChannel(Channel): class BridgeChannel(Channel):
"""bridge channel""" """bridge channel"""
bridge = IOHandler('bridge', 'BRIDGE? %(no)d', '%d,%g,%g,%d,%d,%g') excitation = Parameter('excitation current', readonly=False,
# pylint: disable=invalid-name
ReadingMode = Enum('ReadingMode', standard=0, fast=1, highres=2)
enabled = Parameter(handler=bridge)
excitation = Parameter('excitation current', readonly=False, handler=bridge,
datatype=FloatRange(0.01, 5000., unit='uA')) datatype=FloatRange(0.01, 5000., unit='uA'))
powerlimit = Parameter('power limit', readonly=False, handler=bridge, powerlimit = Parameter('power limit', readonly=False,
datatype=FloatRange(0.001, 1000., unit='uW')) datatype=FloatRange(0.001, 1000., unit='uW'))
dcflag = Parameter('True when excitation is DC (else AC)', readonly=False, handler=bridge, dcflag = Parameter('True when excitation is DC (else AC)', readonly=False,
datatype=BoolType()) datatype=BoolType())
readingmode = Parameter('reading mode', readonly=False, handler=bridge, readingmode = Parameter('reading mode', readonly=False,
datatype=EnumType(ReadingMode)) datatype=EnumType(standard=0, fast=1, highres=2))
voltagelimit = Parameter('voltage limit', readonly=False, handler=bridge, voltagelimit = Parameter('voltage limit', readonly=False,
datatype=FloatRange(0.0001, 100., unit='mV')) datatype=FloatRange(0.0001, 100., unit='mV'))
# pollinterval = Parameter(visibility=3)
def analyze_bridge(self, no, excitation, powerlimit, dcflag, readingmode, voltagelimit): param_names = 'enabled', 'enabled', 'powerlimit', 'dcflag', 'readingmode', 'voltagelimit'
@CommonReadHandler(param_names)
def read_params(self):
no, excitation, powerlimit, self.dcflag, self.readingmode, voltagelimit = literal_eval(
self.communicate('BRIDGE? %d' % self.no))
if self.no != no: if self.no != no:
raise HardwareError('DRVOUT command: channel number in reply does not match') raise HardwareError('DRVOUT command: channel number in reply does not match')
return dict( self.enabled = excitation != 0 and powerlimit != 0 and voltagelimit != 0
enabled=excitation != 0 and powerlimit != 0 and voltagelimit != 0, if excitation:
excitation=excitation or self.excitation, self.excitation = excitation
powerlimit=powerlimit or self.powerlimit, if powerlimit:
dcflag=dcflag, self.powerlimit = powerlimit
readingmode=readingmode, if voltagelimit:
voltagelimit=voltagelimit or self.voltagelimit, self.voltagelimit = voltagelimit
)
def change_bridge(self, change): @CommonWriteHandler(param_names)
change.readValues() def write_params(self, values):
if change.enabled: """write parameters
return self.no, change.excitation, change.powerlimit, change.dcflag, change.readingmode, change.voltagelimit
return self.no, 0, 0, change.dcflag, change.readingmode, 0 :param values: a dict like object containing the parameters to be written
"""
self.read_params() # make sure parameters are up to date
if not values['enabled']:
values['excitation'] = 0
values['powerlimit'] = 0
values['voltagelimit'] = 0
self.comm_write('BRIDGE %(no)d,%(enabled)g,%(powerlimit)g,%(dcflag)d,'
'%(readingmode)d,%(voltagelimit)g' % values)
self.read_params() # read back
class Level(PpmsMixin, HasIodev, Readable): class Level(PpmsBase):
"""helium level""" """helium level"""
level = IOHandler('level', 'LEVEL?', '%g,%d') value = Parameter(datatype=FloatRange(unit='%'))
value = Parameter(datatype=FloatRange(unit='%'), handler=level)
status = Parameter(handler=level)
# pollinterval = Parameter(visibility=3)
channel = 'level' channel = 'level'
def doPoll(self):
self.read_value()
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
pass pass
# must be a no-op # must be a no-op
# when called from Main.read_data, value is always None # when called from Main.read_data, value is always None
# value and status is polled via settings # value and status is polled via settings
def analyze_level(self, level, status): def read_value(self):
# ignore 'old reading' state of the flag, as this happens only for a short time # ignore 'old reading' state of the flag, as this happens only for a short time
# during measuring return literal_eval(self.communicate('LEVEL?'))[0]
return dict(value=level, status=(self.Status.IDLE, ''))
class Chamber(PpmsMixin, HasIodev, Drivable): class Chamber(PpmsBase, Drivable):
"""sample chamber handling """sample chamber handling
value is an Enum, which is redundant with the status text value is an Enum, which is redundant with the status text
""" """
chamber = IOHandler('chamber', 'CHAMBER?', '%d')
Status = Drivable.Status Status = Drivable.Status
# pylint: disable=invalid-name code_table = [
Operation = Enum( # valuecode, status, statusname, opcode, targetname
'Operation', (0, Status.IDLE, 'unknown', 10, 'noop'),
seal_immediately=0, (1, Status.IDLE, 'purged_and_sealed', 1, 'purge_and_seal'),
purge_and_seal=1, (2, Status.IDLE, 'vented_and_sealed', 2, 'vent_and_seal'),
vent_and_seal=2, (3, Status.WARN, 'sealed_unknown', 0, 'seal_immediately'),
pump_continuously=3, (4, Status.BUSY, 'purge_and_seal', None, None),
vent_continuously=4, (5, Status.BUSY, 'vent_and_seal', None, None),
hi_vacuum=5, (6, Status.BUSY, 'pumping_down', None, None),
noop=10, (8, Status.IDLE, 'pumping_continuously', 3, 'pump_continuously'),
) (9, Status.IDLE, 'venting_continuously', 4, 'vent_continuously'),
StatusCode = Enum( (15, Status.ERROR, 'general_failure', None, None),
'StatusCode', ]
unknown=0, value_codes = {k: v for v, _, k, _, _ in code_table}
purged_and_sealed=1, target_codes = {k: v for v, _, _, _, k in code_table if k}
vented_and_sealed=2, name2opcode = {k: v for _, _, _, v, k in code_table if k}
sealed_unknown=3, opcode2name = {v: k for _, _, _, v, k in code_table if k}
purge_and_seal=4, status_map = {v: (c, k.replace('_', ' ')) for v, c, k, _, _ in code_table}
vent_and_seal=5, value = Parameter(description='chamber state', datatype=EnumType(**value_codes), default=0)
pumping_down=6, target = Parameter(description='chamber command', datatype=EnumType(**target_codes), default='noop')
at_hi_vacuum=7,
pumping_continuously=8,
venting_continuously=9,
general_failure=15,
)
value = Parameter(description='chamber state', handler=chamber,
datatype=EnumType(StatusCode))
target = Parameter(description='chamber command', handler=chamber,
datatype=EnumType(Operation))
# pollinterval = Parameter(visibility=3)
STATUS_MAP = {
StatusCode.purged_and_sealed: (Status.IDLE, 'purged and sealed'),
StatusCode.vented_and_sealed: (Status.IDLE, 'vented and sealed'),
StatusCode.sealed_unknown: (Status.WARN, 'sealed unknown'),
StatusCode.purge_and_seal: (Status.BUSY, 'purge and seal'),
StatusCode.vent_and_seal: (Status.BUSY, 'vent and seal'),
StatusCode.pumping_down: (Status.BUSY, 'pumping down'),
StatusCode.at_hi_vacuum: (Status.IDLE, 'at hi vacuum'),
StatusCode.pumping_continuously: (Status.IDLE, 'pumping continuously'),
StatusCode.venting_continuously: (Status.IDLE, 'venting continuously'),
StatusCode.general_failure: (Status.ERROR, 'general failure'),
}
channel = 'chamber' channel = 'chamber'
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
status_code = (packed_status >> 8) & 0xf status_code = (packed_status >> 8) & 0xf
if status_code in self.STATUS_MAP: if status_code in self.status_map:
self.value = status_code self.value = status_code
self.status = self.STATUS_MAP[status_code] self.status = self.status_map[status_code]
else: else:
self.value = self.StatusCode.unknown self.value = self.value_map['unknown']
self.status = (self.Status.ERROR, 'unknown status code %d' % status_code) self.status = (self.Status.ERROR, 'unknown status code %d' % status_code)
def analyze_chamber(self, target): def read_target(self):
return dict(target=target) opcode = int(self.communicate('CHAMBER?'))
return self.opcode2name[opcode]
def change_chamber(self, change): def write_target(self, value):
# write settings, combining <pname>=<value> and current attributes if value == self.target.noop:
# and request updated settings return self.target.noop
if change.target == self.Operation.noop: opcode = self.name2opcode[self.target.enum(value).name]
return None assert self.communicate('CHAMBER %d' % opcode) == 'OK'
return (change.target,) return self.read_target()
class Temp(PpmsMixin, HasIodev, Drivable): class Temp(PpmsBase, Drivable):
"""temperature""" """temperature"""
temp = IOHandler('temp', 'TEMP?', '%g,%g,%d')
Status = Enum( Status = Enum(
Drivable.Status, Drivable.Status,
RAMPING=370, RAMPING=370,
STABILIZING=380, STABILIZING=380,
) )
# pylint: disable=invalid-name value = Parameter(datatype=FloatRange(unit='K'))
ApproachMode = Enum('ApproachMode', fast_settle=0, no_overshoot=1) status = Parameter(datatype=StatusType(Status))
target = Parameter(datatype=FloatRange(1.7, 402.0, unit='K'), needscfg=False)
value = Parameter(datatype=FloatRange(unit='K'), poll=True)
status = Parameter(datatype=StatusType(Status), poll=True)
target = Parameter(datatype=FloatRange(1.7, 402.0, unit='K'), poll=False, needscfg=False)
setpoint = Parameter('intermediate set point', setpoint = Parameter('intermediate set point',
datatype=FloatRange(1.7, 402.0, unit='K'), handler=temp) datatype=FloatRange(1.7, 402.0, unit='K'))
ramp = Parameter('ramping speed', readonly=False, default=0, ramp = Parameter('ramping speed', readonly=False, default=0,
datatype=FloatRange(0, 20, unit='K/min')) datatype=FloatRange(0, 20, unit='K/min'))
workingramp = Parameter('intermediate ramp value', workingramp = Parameter('intermediate ramp value',
datatype=FloatRange(0, 20, unit='K/min'), handler=temp) datatype=FloatRange(0, 20, unit='K/min'), default=0)
approachmode = Parameter('how to approach target!', readonly=False, handler=temp, approachmode = Parameter('how to approach target!', readonly=False,
datatype=EnumType(ApproachMode)) datatype=EnumType(fast_settle=0, no_overshoot=1), default=0)
# pollinterval = Parameter(visibility=3)
timeout = Parameter('drive timeout, in addition to ramp time', readonly=False, timeout = Parameter('drive timeout, in addition to ramp time', readonly=False,
datatype=FloatRange(0, unit='sec'), default=3600) datatype=FloatRange(0, unit='sec'), default=3600)
general_stop = Property('respect general stop', datatype=BoolType(),
# pylint: disable=invalid-name default=True, value=False)
TempStatus = Enum(
'TempStatus',
stable_at_target=1,
changing=2,
within_tolerance=5,
outside_tolerance=6,
filling_emptying_reservoir=7,
standby=10,
control_disabled=13,
can_not_complete=14,
general_failure=15,
)
STATUS_MAP = { STATUS_MAP = {
1: (Status.IDLE, 'stable at target'), 1: (Status.IDLE, 'stable at target'),
2: (Status.RAMPING, 'ramping'), 2: (Status.RAMPING, 'ramping'),
@ -408,8 +368,6 @@ class Temp(PpmsMixin, HasIodev, Drivable):
14: (Status.ERROR, 'can not complete'), 14: (Status.ERROR, 'can not complete'),
15: (Status.ERROR, 'general failure'), 15: (Status.ERROR, 'general failure'),
} }
general_stop = Property('respect general stop', datatype=BoolType(),
default=True, value=False)
channel = 'temp' channel = 'temp'
_stopped = False _stopped = False
@ -420,6 +378,42 @@ class Temp(PpmsMixin, HasIodev, Drivable):
_wait_at10 = False _wait_at10 = False
_ramp_at_limit = False _ramp_at_limit = False
param_names = 'setpoint', 'workingramp', 'approachmode'
@CommonReadHandler(param_names)
def read_params(self):
settings = literal_eval(self.communicate('TEMP?'))
if settings == self._last_settings:
# update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
return
self.setpoint, self.workingramp, self.approachmode = self._last_settings = settings
if self.setpoint != 10 or not self._wait_at10:
self.log.debug('read back target %g %r' % (self.setpoint, self._wait_at10))
self.target = self.setpoint
if self.workingramp != 2 or not self._ramp_at_limit:
self.log.debug('read back ramp %g %r' % (self.workingramp, self._ramp_at_limit))
self.ramp = self.workingramp
def _write_params(self, setpoint, ramp, approachmode):
wait_at10 = False
ramp_at_limit = False
if self.value > 11:
if setpoint <= 10:
wait_at10 = True
setpoint = 10
elif self.value > setpoint:
if ramp >= 2:
ramp = 2
ramp_at_limit = True
self._wait_at10 = wait_at10
self._ramp_at_limit = ramp_at_limit
self.calc_expected(setpoint, ramp)
self.log.debug(
'change_temp v %r s %r r %r w %r l %r' % (self.value, setpoint, ramp, wait_at10, ramp_at_limit))
self.comm_write('TEMP %g,%g,%d' % (setpoint, ramp, approachmode))
self.read_params()
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
if value is None: if value is None:
self.status = (self.Status.ERROR, 'invalid value') self.status = (self.Status.ERROR, 'invalid value')
@ -437,7 +431,7 @@ class Temp(PpmsMixin, HasIodev, Drivable):
if now > self._cool_deadline: if now > self._cool_deadline:
self._wait_at10 = False self._wait_at10 = False
self._last_change = now self._last_change = now
self.temp.write(self, 'setpoint', self.target) self._write_params(self.target, self.ramp, self.approachmode)
status = (self.Status.STABILIZING, 'waiting at 10 K') status = (self.Status.STABILIZING, 'waiting at 10 K')
if self._last_change: # there was a change, which is not yet confirmed by hw if self._last_change: # there was a change, which is not yet confirmed by hw
if now > self._last_change + 5: if now > self._last_change + 5:
@ -466,41 +460,6 @@ class Temp(PpmsMixin, HasIodev, Drivable):
self._expected_target_time = 0 self._expected_target_time = 0
self.status = status self.status = status
def analyze_temp(self, setpoint, workingramp, approachmode):
if (setpoint, workingramp, approachmode) == self._last_settings:
# update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
return {}
self._last_settings = setpoint, workingramp, approachmode
if setpoint != 10 or not self._wait_at10:
self.log.debug('read back target %g %r' % (setpoint, self._wait_at10))
self.target = setpoint
if workingramp != 2 or not self._ramp_at_limit:
self.log.debug('read back ramp %g %r' % (workingramp, self._ramp_at_limit))
self.ramp = workingramp
result = dict(setpoint=setpoint, workingramp=workingramp)
self.log.debug('analyze_temp %r %r' % (result, (self.target, self.ramp)))
return result
def change_temp(self, change):
ramp = change.ramp
setpoint = change.setpoint
wait_at10 = False
ramp_at_limit = False
if self.value > 11:
if setpoint <= 10:
wait_at10 = True
setpoint = 10
elif self.value > setpoint:
if ramp >= 2:
ramp = 2
ramp_at_limit = True
self._wait_at10 = wait_at10
self._ramp_at_limit = ramp_at_limit
self.calc_expected(setpoint, ramp)
self.log.debug('change_temp v %r s %r r %r w %r l %r' % (self.value, setpoint, ramp, wait_at10, ramp_at_limit))
return setpoint, ramp, change.approachmode
def write_target(self, target): def write_target(self, target):
self._stopped = False self._stopped = False
if abs(self.target - self.value) <= 2e-5 * target and target == self.target: if abs(self.target - self.value) <= 2e-5 * target and target == self.target:
@ -508,23 +467,23 @@ class Temp(PpmsMixin, HasIodev, Drivable):
self._status_before_change = self.status self._status_before_change = self.status
self.status = (self.Status.BUSY, 'changed target') self.status = (self.Status.BUSY, 'changed target')
self._last_change = time.time() self._last_change = time.time()
self.temp.write(self, 'setpoint', target) self._write_params(target, self.ramp, self.approachmode)
self.log.debug('write_target %s' % repr((self.setpoint, target, self._wait_at10))) self.log.debug('write_target %s' % repr((self.setpoint, target, self._wait_at10)))
return target return target
def write_approachmode(self, value): def write_approachmode(self, value):
if self.isDriving(): if self.isDriving():
self.temp.write(self, 'approachmode', value) self._write_params(self.setpoint, self.ramp, value)
return Done return Done
self.approachmode = value self.approachmode = value
return None # do not execute TEMP command, as this would trigger an unnecessary T change return Done # do not execute TEMP command, as this would trigger an unnecessary T change
def write_ramp(self, value): def write_ramp(self, value):
if self.isDriving(): if self.isDriving():
self.temp.write(self, 'ramp', value) self._write_params(self.setpoint, value, self.approachmode)
return Done return Done
# self.ramp = value self.ramp = value
return None # do not execute TEMP command, as this would trigger an unnecessary T change return Done # do not execute TEMP command, as this would trigger an unnecessary T change
def calc_expected(self, target, ramp): def calc_expected(self, target, ramp):
self._expected_target_time = time.time() + abs(target - self.value) * 60.0 / max(0.1, ramp) self._expected_target_time = time.time() + abs(target - self.value) * 60.0 / max(0.1, ramp)
@ -542,10 +501,9 @@ class Temp(PpmsMixin, HasIodev, Drivable):
self._stopped = True self._stopped = True
class Field(PpmsMixin, HasIodev, Drivable): class Field(PpmsBase, Drivable):
"""magnetic field""" """magnetic field"""
field = IOHandler('field', 'FIELD?', '%g,%g,%d,%d')
Status = Enum( Status = Enum(
Drivable.Status, Drivable.Status,
PREPARED=150, PREPARED=150,
@ -554,20 +512,15 @@ class Field(PpmsMixin, HasIodev, Drivable):
STABILIZING=380, STABILIZING=380,
FINALIZING=390, FINALIZING=390,
) )
# pylint: disable=invalid-name value = Parameter(datatype=FloatRange(unit='T'))
PersistentMode = Enum('PersistentMode', persistent=0, driven=1) status = Parameter(datatype=StatusType(Status))
ApproachMode = Enum('ApproachMode', linear=0, no_overshoot=1, oscillate=2) target = Parameter(datatype=FloatRange(-15, 15, unit='T')) # poll only one parameter
ramp = Parameter('ramping speed', readonly=False,
value = Parameter(datatype=FloatRange(unit='T'), poll=True) datatype=FloatRange(0.064, 1.19, unit='T/min'), default=0.19)
status = Parameter(datatype=StatusType(Status), poll=True) approachmode = Parameter('how to approach target', readonly=False,
target = Parameter(datatype=FloatRange(-15, 15, unit='T'), handler=field) datatype=EnumType(linear=0, no_overshoot=1, oscillate=2), default=0)
ramp = Parameter('ramping speed', readonly=False, handler=field, persistentmode = Parameter('what to do after changing field', readonly=False,
datatype=FloatRange(0.064, 1.19, unit='T/min')) datatype=EnumType(persistent=0, driven=1), default=0)
approachmode = Parameter('how to approach target', readonly=False, handler=field,
datatype=EnumType(ApproachMode))
persistentmode = Parameter('what to do after changing field', readonly=False, handler=field,
datatype=EnumType(PersistentMode))
# pollinterval = Parameter(visibility=3)
STATUS_MAP = { STATUS_MAP = {
1: (Status.IDLE, 'persistent mode'), 1: (Status.IDLE, 'persistent mode'),
@ -587,6 +540,25 @@ class Field(PpmsMixin, HasIodev, Drivable):
_last_target = None # last reached target _last_target = None # last reached target
_last_change = 0 # means no target change is pending _last_change = 0 # means no target change is pending
param_names = 'target', 'ramp', 'approachmode', 'persistentmode'
@CommonReadHandler(param_names)
def read_params(self):
settings = literal_eval(self.communicate('FIELD?'))
# print('last_settings tt %s' % repr(self._last_settings))
if settings == self._last_settings:
# we update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
return
target, ramp, self.approachmode, self.persistentmode = self._last_settings = settings
self.target = round(target * 1e-4, 7)
self.ramp = ramp * 6e-3
def _write_params(self, target, ramp, approachmode, persistentmode):
self.comm_write('FIELD %g,%g,%d,%d' % (
target * 1e+4, ramp / 6e-3, approachmode, persistentmode))
self.read_params()
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
if value is None: if value is None:
self.status = (self.Status.ERROR, 'invalid value') self.status = (self.Status.ERROR, 'invalid value')
@ -621,19 +593,6 @@ class Field(PpmsMixin, HasIodev, Drivable):
status = (status[0], 'stopping (%s)' % status[1]) status = (status[0], 'stopping (%s)' % status[1])
self.status = status self.status = status
def analyze_field(self, target, ramp, approachmode, persistentmode):
# print('last_settings tt %s' % repr(self._last_settings))
if (target, ramp, approachmode, persistentmode) == self._last_settings:
# we update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
return {}
self._last_settings = target, ramp, approachmode, persistentmode
return dict(target=round(target * 1e-4, 7), ramp=ramp * 6e-3, approachmode=approachmode,
persistentmode=persistentmode)
def change_field(self, change):
return change.target * 1e+4, change.ramp / 6e-3, change.approachmode, change.persistentmode
def write_target(self, target): def write_target(self, target):
if abs(self.target - self.value) <= 2e-5 and target == self.target: if abs(self.target - self.value) <= 2e-5 and target == self.target:
self.target = target self.target = target
@ -642,7 +601,7 @@ class Field(PpmsMixin, HasIodev, Drivable):
self._stopped = False self._stopped = False
self._last_change = time.time() self._last_change = time.time()
self.status = (self.Status.BUSY, 'changed target') self.status = (self.Status.BUSY, 'changed target')
self.field.write(self, 'target', target) self._write_params(target, self.ramp, self.approachmode, self.persistentmode)
return Done return Done
def write_persistentmode(self, mode): def write_persistentmode(self, mode):
@ -653,19 +612,19 @@ class Field(PpmsMixin, HasIodev, Drivable):
self._status_before_change = self.status self._status_before_change = self.status
self._stopped = False self._stopped = False
self.status = (self.Status.BUSY, 'changed persistent mode') self.status = (self.Status.BUSY, 'changed persistent mode')
self.field.write(self, 'persistentmode', mode) self._write_params(self.target, self.ramp, self.approachmode, mode)
return Done return Done
def write_ramp(self, value): def write_ramp(self, value):
self.ramp = value self.ramp = value
if self.isDriving(): if self.isDriving():
self.field.write(self, 'ramp', value) self._write_params(self.target, value, self.approachmode, self.persistentmode)
return Done return Done
return None # do not execute FIELD command, as this would trigger a ramp up of leads current return None # do not execute FIELD command, as this would trigger a ramp up of leads current
def write_approachmode(self, value): def write_approachmode(self, value):
if self.isDriving(): if self.isDriving():
self.field.write(self, 'approachmode', value) self._write_params(self.target, self.ramp, value, self.persistentmode)
return Done return Done
return None # do not execute FIELD command, as this would trigger a ramp up of leads current return None # do not execute FIELD command, as this would trigger a ramp up of leads current
@ -680,20 +639,17 @@ class Field(PpmsMixin, HasIodev, Drivable):
self._stopped = True self._stopped = True
class Position(PpmsMixin, HasIodev, Drivable): class Position(PpmsBase, Drivable):
"""rotator position""" """rotator position"""
move = IOHandler('move', 'MOVE?', '%g,%g,%g')
Status = Drivable.Status Status = Drivable.Status
value = Parameter(datatype=FloatRange(unit='deg'), poll=True) value = Parameter(datatype=FloatRange(unit='deg'))
target = Parameter(datatype=FloatRange(-720., 720., unit='deg'), handler=move) target = Parameter(datatype=FloatRange(-720., 720., unit='deg'))
enabled = Parameter('is this channel used?', readonly=False, poll=False, enabled = Parameter('is this channel used?', readonly=False,
datatype=BoolType(), default=True) datatype=BoolType(), default=True)
speed = Parameter('motor speed', readonly=False, handler=move, speed = Parameter('motor speed', readonly=False, default=12,
datatype=FloatRange(0.8, 12, unit='deg/sec')) datatype=FloatRange(0.8, 12, unit='deg/sec'))
# pollinterval = Parameter(visibility=3)
STATUS_MAP = { STATUS_MAP = {
1: (Status.IDLE, 'at target'), 1: (Status.IDLE, 'at target'),
5: (Status.BUSY, 'moving'), 5: (Status.BUSY, 'moving'),
@ -708,6 +664,23 @@ class Position(PpmsMixin, HasIodev, Drivable):
_last_change = 0 _last_change = 0
_within_target = 0 # time since we are within target _within_target = 0 # time since we are within target
param_names = 'target', 'speed'
@CommonReadHandler(param_names)
def read_params(self):
settings = literal_eval(self.communicate('MOVE?'))
if settings == self._last_settings:
# we update parameters only on change, as 'speed' is
# not always sent to the hardware
return
self.target, _, speed = self._last_settings = settings
self.speed = (15 - speed) * 0.8
def _write_params(self, target, speed):
speed = int(round(min(14, max(0, 15 - speed / 0.8)), 0))
self.comm_write('MOVE %g,%d,%d' % (target, 0, speed))
return self.read_params()
def update_value_status(self, value, packed_status): def update_value_status(self, value, packed_status):
if not self.enabled: if not self.enabled:
self.status = (self.Status.DISABLED, 'disabled') self.status = (self.Status.DISABLED, 'disabled')
@ -745,29 +718,17 @@ class Position(PpmsMixin, HasIodev, Drivable):
status = (status[0], 'stopping (%s)' % status[1]) status = (status[0], 'stopping (%s)' % status[1])
self.status = status self.status = status
def analyze_move(self, target, mode, speed):
if (target, speed) == self._last_settings:
# we update parameters only on change, as 'speed' is
# not always sent to the hardware
return {}
self._last_settings = target, speed
return dict(target=target, speed=(15 - speed) * 0.8)
def change_move(self, change):
speed = int(round(min(14, max(0, 15 - change.speed / 0.8)), 0))
return change.target, 0, speed
def write_target(self, target): def write_target(self, target):
self._stopped = False self._stopped = False
self._last_change = 0 self._last_change = 0
self._status_before_change = self.status self._status_before_change = self.status
self.status = (self.Status.BUSY, 'changed target') self.status = (self.Status.BUSY, 'changed target')
self.move.write(self, 'target', target) self._write_params(target, self.speed)
return Done return Done
def write_speed(self, value): def write_speed(self, value):
if self.isDriving(): if self.isDriving():
self.move.write(self, 'speed', value) self._write_params(self.target, value)
return Done return Done
self.speed = value self.speed = value
return None # do not execute MOVE command, as this would trigger an unnecessary move return None # do not execute MOVE command, as this would trigger an unnecessary move

View File

@ -22,12 +22,12 @@
import math import math
import os import os
from os.path import basename, dirname, exists, join from os.path import basename, exists, join
import numpy as np import numpy as np
from scipy.interpolate import splev, splrep # pylint: disable=import-error from scipy.interpolate import splev, splrep # pylint: disable=import-error
from secop.core import Attached, BoolType, Parameter, Readable, StringType, FloatRange from secop.core import Attached, BoolType, Parameter, Readable, StringType
def linear(x): def linear(x):
@ -74,18 +74,13 @@ class Parser340(StdParser):
def parse(self, line): def parse(self, line):
"""scan header for data format""" """scan header for data format"""
if self.header: if self.header:
key, _, value = line.partition(':') if line.startswith("Data Format"):
if value: # this is a header line, as it contains ':' dataformat = line.split(":")[1].strip()[0]
value = value.split()[0] if dataformat == '4':
key = ''.join(key.split()).lower()
if key == 'dataformat':
if value == '4':
self.logx, self.logy = True, False # logOhm self.logx, self.logy = True, False # logOhm
elif value == '5': elif dataformat == '5':
self.logx, self.logy = True, True # logOhm, logK self.logx, self.logy = True, True # logOhm, logK
elif value not in ('1', '2', '3'): elif line.startswith("No."):
raise ValueError('invalid Data Format')
elif 'No.' in line:
self.header = False self.header = False
return return
super().parse(line) super().parse(line)
@ -109,9 +104,7 @@ class CalCurve:
calibname = sensopt.pop(0) calibname = sensopt.pop(0)
_, dot, ext = basename(calibname).rpartition('.') _, dot, ext = basename(calibname).rpartition('.')
kind = None kind = None
pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(',') for path in os.environ.get('FRAPPY_CALIB_PATH', '').split(','):
pathlist.append(join(dirname(__file__), 'calcurves'))
for path in pathlist:
# first try without adding kind # first try without adding kind
filename = join(path.strip(), calibname) filename = join(path.strip(), calibname)
if exists(filename): if exists(filename):
@ -141,26 +134,13 @@ class CalCurve:
cls, args = KINDS.get(kind, (StdParser, {})) cls, args = KINDS.get(kind, (StdParser, {}))
args.update(optargs) args.update(optargs)
try:
parser = cls(**args) parser = cls(**args)
with open(filename) as f: with open(filename) as f:
for line in f: for line in f:
parser.parse(line) parser.parse(line)
except Exception as e:
raise ValueError('calib curve %s: %s' % (calibspec, e))
self.convert_x = nplog if parser.logx else linear self.convert_x = nplog if parser.logx else linear
self.convert_y = npexp if parser.logy else linear self.convert_y = npexp if parser.logy else linear
x = np.asarray(parser.xdata) self.spline = splrep(np.asarray(parser.xdata), np.asarray(parser.ydata), s=0)
y = np.asarray(parser.ydata)
if np.all(x[:-1] > x[1:]): # all decreasing
x = np.flip(x)
y = np.flip(y)
elif np.any(x[:-1] >= x[1:]): # some not increasing
raise ValueError('calib curve %s is not monotonic' % calibspec)
try:
self.spline = splrep(x, y, s=0, k=min(3, len(x) - 1))
except (ValueError, TypeError):
raise ValueError('invalid calib curve %s' % calibspec)
def __call__(self, value): def __call__(self, value):
"""convert value """convert value
@ -176,23 +156,17 @@ class Sensor(Readable):
calib = Parameter('calibration name', datatype=StringType(), readonly=False) calib = Parameter('calibration name', datatype=StringType(), readonly=False)
abs = Parameter('True: take abs(raw) before calib', datatype=BoolType(), readonly=False, default=True) abs = Parameter('True: take abs(raw) before calib', datatype=BoolType(), readonly=False, default=True)
value = Parameter(datatype=FloatRange(unit='K')) value = Parameter(unit='K')
pollinterval = Parameter(export=False) pollinterval = Parameter(export=False)
status = Parameter(default=(Readable.Status.ERROR, 'unintialized')) status = Parameter(default=(Readable.Status.ERROR, 'unintialized'))
pollerClass = None description = 'a calibrated sensor value'
_value_error = None _value_error = None
enablePoll = False
def checkProperties(self):
if 'description' not in self.propertyValues:
self.description = '_' # avoid complaining about missing description
super().checkProperties()
def initModule(self): def initModule(self):
self._rawsensor.registerCallbacks(self, ['status']) # auto update status self._rawsensor.registerCallbacks(self, ['status']) # auto update status
self._calib = CalCurve(self.calib) self._calib = CalCurve(self.calib)
if self.description == '_':
self.description = '%r calibrated with curve %r' % (self.rawsensor, self.calib)
def write_calib(self, value): def write_calib(self, value):
self._calib = CalCurve(value) self._calib = CalCurve(value)
@ -200,7 +174,7 @@ class Sensor(Readable):
def update_value(self, value): def update_value(self, value):
if self.abs: if self.abs:
value = abs(float(value)) value = abs(value)
self.value = self._calib(value) self.value = self._calib(value)
self._value_error = None self._value_error = None