Autogain function for SR830 lock-in driver
Change-Id: If07ec9182e5153e1237b9818ce555162f54e0ae5
This commit is contained in:
parent
1bd188e326
commit
416cdd5a88
@ -19,7 +19,11 @@
|
||||
# *****************************************************************************
|
||||
|
||||
import re
|
||||
from frappy.core import StringIO, HasIO, Parameter, EnumType, FloatRange, TupleOf, ERROR, IDLE, WARN
|
||||
import time
|
||||
from frappy.core import StringIO, HasIO, Parameter, EnumType, FloatRange, TupleOf, ERROR, IDLE, WARN, StatusType, \
|
||||
Readable, BUSY
|
||||
|
||||
from frappy.errors import IsBusyError
|
||||
|
||||
|
||||
def string_to_value(value):
|
||||
@ -36,7 +40,7 @@ class SR830_IO(StringIO):
|
||||
identification = [('*IDN?', r'Stanford_Research_Systems,.*')]
|
||||
|
||||
|
||||
class StanfRes(HasIO):
|
||||
class StanfRes(HasIO, Readable):
|
||||
def set_par(self, cmd, *args):
|
||||
head = ','.join([cmd] + [a if isinstance(a, str) else f'{a:g}' for a in args])
|
||||
tail = cmd.replace(' ', '? ')
|
||||
@ -74,6 +78,8 @@ class XY(StanfRes):
|
||||
autorange = Parameter('autorange_on', EnumType('autorange', off=0, soft=1, hard=2),
|
||||
readonly=False, default=0)
|
||||
|
||||
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
|
||||
|
||||
SEN_RANGE = ['2nV', '5nV', '10nV', '20nV', '50nV', '100nV', '200nV', '500nV',
|
||||
'1uV', '2uV', '5uV', '10uV', '20uV', '50uV', '100uV', '200uV', '500uV',
|
||||
'1mV', '2mV', '5mV', '10mV', '20mV', '50mV', '100mV', '200mV', '500mV',
|
||||
@ -96,51 +102,65 @@ class XY(StanfRes):
|
||||
|
||||
ioClass = SR830_IO
|
||||
|
||||
status_messages = [
|
||||
(ERROR, 'execution error', 2, 4),
|
||||
(ERROR, 'illegal command', 2, 5),
|
||||
(ERROR, 'reserve/input overload', 3, 0),
|
||||
(ERROR, 'tc overload', 3, 1),
|
||||
(ERROR, 'output overload', 3, 2),
|
||||
(WARN, 'input queue overflow, cleared', 2, 0),
|
||||
(WARN, 'output queue overflow, cleared', 2, 2),
|
||||
(WARN, 'reference unlock', 3, 3),
|
||||
(WARN, 'freq crosses 200 Hz', 3, 4),
|
||||
(IDLE, 'no scan in progress', 1, 0),
|
||||
(IDLE, 'no command execution in progress', 1, 1),
|
||||
(IDLE, 'unused', 1, 7),
|
||||
(IDLE, '', 2, 1),
|
||||
(IDLE, '', 2, 3),
|
||||
(IDLE, '', 3, 7),
|
||||
(IDLE, '', 4, 0),
|
||||
(IDLE, '', 4, 3),
|
||||
]
|
||||
_autogain_started = 0
|
||||
|
||||
# status = serial poll status byte, standard event status byte, lock-in status byte, error status byte
|
||||
def read_status(self):
|
||||
status_values = [
|
||||
int(self.communicate('*STB?')), # serial poll status byte
|
||||
int(self.communicate('*ESR?')), # standard event status byte
|
||||
int(self.communicate('LIAS?')), # lock-in status byte
|
||||
int(self.communicate('ERRS?')), # error status byte
|
||||
]
|
||||
if time.time() < self._autogain_started + self.tc * 10:
|
||||
return BUSY, 'changing gain'
|
||||
|
||||
for vi in range(1, 5):
|
||||
value = status_values[vi - 1]
|
||||
stb = int(self.communicate('*STB?')) # serial poll status byte
|
||||
esr = int(self.communicate('*ESR?')) # standard event status byte
|
||||
lias = int(self.communicate('LIAS?')) # lock-in status byte
|
||||
errs = int(self.communicate('ERRS?')) # error status byte
|
||||
|
||||
for status_type, status_msg, curr_vi, bit in self.status_messages:
|
||||
if curr_vi == vi and value & (1 << bit):
|
||||
# conv_status = HasConvergence.read_status(self)
|
||||
return status_type, status_msg
|
||||
if lias & (1 << 2):
|
||||
return ERROR, 'output overload'
|
||||
if lias & (1 << 1):
|
||||
return ERROR, 'tc overload'
|
||||
if lias & (1 << 0):
|
||||
return ERROR, 'reserve/input overload'
|
||||
if esr & (1 << 5):
|
||||
return ERROR, 'illegal command'
|
||||
if esr & (1 << 4):
|
||||
return ERROR, 'execution error'
|
||||
if errs & (1 << 1):
|
||||
return ERROR, 'backup error'
|
||||
if errs & (1 << 2):
|
||||
return ERROR, 'RAM error'
|
||||
if errs & (1 << 4):
|
||||
return ERROR, 'ROM error'
|
||||
if errs & (1 << 5):
|
||||
return ERROR, 'GRIB error'
|
||||
if errs & (1 << 6):
|
||||
return ERROR, 'DSP error'
|
||||
if errs & (1 << 7):
|
||||
return ERROR, 'internal math error'
|
||||
if esr & (1 << 0):
|
||||
return WARN, 'input queue overflow, cleared'
|
||||
if esr & (1 << 2):
|
||||
return WARN, 'output queue overflow, cleared'
|
||||
if lias & (1 << 3):
|
||||
return WARN, 'reference unlock'
|
||||
if lias & (1 << 4):
|
||||
return WARN, 'freq crosses 200 Hz'
|
||||
if not stb & (1 << 0):
|
||||
return BUSY, 'scan in progress'
|
||||
if not stb & (1 << 1):
|
||||
return BUSY, 'command execution in progress'
|
||||
return IDLE, ''
|
||||
|
||||
def read_value(self):
|
||||
if self.read_status()[0] == BUSY:
|
||||
raise IsBusyError('changing gain')
|
||||
reply = self.get_par('SNAP? 1, 2')
|
||||
value = tuple(float(x) for x in reply)
|
||||
x, y = value
|
||||
maxxy = max(abs(x), abs(y))
|
||||
if self.autorange == 1:
|
||||
if max(abs(x), abs(y)) >= 0.9 * self.range and self.irange < 26:
|
||||
if maxxy >= 0.9 * self.range and self.irange < 26:
|
||||
self.write_irange(self.irange + 1)
|
||||
elif max(abs(x), abs(y)) <= 0.3 * self.range and self.irange > 0:
|
||||
elif maxxy <= 0.3 * self.range and self.irange > 0:
|
||||
self.write_irange(self.irange - 1)
|
||||
return value
|
||||
|
||||
@ -155,6 +175,8 @@ class XY(StanfRes):
|
||||
def write_irange(self, irange):
|
||||
value = int(irange)
|
||||
self.set_par(f'SENS {value}')
|
||||
self._autogain_started = time.time()
|
||||
self.read_range()
|
||||
return value
|
||||
|
||||
def write_range(self, target):
|
||||
|
Loading…
x
Reference in New Issue
Block a user