Implement ramping qnw
Change-Id: I9d7fab73194a0a8be3a230cc7ca99066d2553fce
This commit is contained in:
@ -21,7 +21,7 @@
|
|||||||
|
|
||||||
|
|
||||||
from frappy.core import Readable, Parameter, FloatRange, IDLE, ERROR, BoolType,\
|
from frappy.core import Readable, Parameter, FloatRange, IDLE, ERROR, BoolType,\
|
||||||
StringIO, HasIO, Property, Writable, Drivable, BUSY, StringType
|
StringIO, HasIO, Property, Writable, Drivable, BUSY, StringType, Done
|
||||||
from frappy.errors import InternalError
|
from frappy.errors import InternalError
|
||||||
|
|
||||||
|
|
||||||
@ -33,18 +33,9 @@ class QnwIO(StringIO):
|
|||||||
|
|
||||||
class SensorTC1(HasIO, Readable):
|
class SensorTC1(HasIO, Readable):
|
||||||
ioClass = QnwIO
|
ioClass = QnwIO
|
||||||
value = Parameter(unit='degC')
|
value = Parameter(unit='degC', min=-15, max=120)
|
||||||
channel = Property('channel name', StringType())
|
channel = Property('channel name', StringType())
|
||||||
|
|
||||||
ERROR_MAP = {
|
|
||||||
-1: (IDLE, ''),
|
|
||||||
5: (ERROR, 'Cell T out of range (Loose cable? Sensor failure?)'),
|
|
||||||
6: (ERROR, 'Cell and heat exchanger T out of range (Loose cable?)'),
|
|
||||||
7: (ERROR, 'Heat exchanger T out of range (Loose cable? Sensor failure?)'),
|
|
||||||
8: (ERROR, 'Inadequate coolant (check flow). Temperature control has shut down'),
|
|
||||||
9: (ERROR, 'Syntax error')
|
|
||||||
}
|
|
||||||
|
|
||||||
def set_param(self, adr, value=None):
|
def set_param(self, adr, value=None):
|
||||||
short = adr.split()[0]
|
short = adr.split()[0]
|
||||||
# try 3 times in case we got an asynchronous message
|
# try 3 times in case we got an asynchronous message
|
||||||
@ -69,17 +60,18 @@ class SensorTC1(HasIO, Readable):
|
|||||||
return self.get_param(self.channel)
|
return self.get_param(self.channel)
|
||||||
|
|
||||||
def read_status(self):
|
def read_status(self):
|
||||||
reply = self.get_param('IS') # instrument status
|
dt = self.parameters['value'].datatype
|
||||||
if reply[0] == 1:
|
if dt.min <= self.value <= dt.max:
|
||||||
return self.ERROR_MAP[int(self.get_param('ER'))]
|
return IDLE, ''
|
||||||
return IDLE, ''
|
return ERROR, 'value out of range (cable unplugged?)'
|
||||||
|
|
||||||
|
|
||||||
class TemperatureLoopTC1(SensorTC1, Drivable):
|
class TemperatureLoopTC1(SensorTC1, Drivable):
|
||||||
value = Parameter('temperature', unit='degC')
|
value = Parameter('temperature', unit='degC')
|
||||||
target = Parameter('setpoint', unit='degC')
|
target = Parameter('setpoint', unit='degC', min=-5, max=110)
|
||||||
control = Parameter('temperature control flag', BoolType(), readonly=False)
|
control = Parameter('temperature control flag', BoolType(), readonly=False)
|
||||||
ramp = Parameter('ramping value', FloatRange, unit='degC/min', readonly=False)
|
ramp = Parameter('ramping value', FloatRange, unit='degC/min', readonly=False)
|
||||||
|
ramp_used = Parameter('ramping status', BoolType(), default=False, readonly=False)
|
||||||
target_min = Parameter('lowest target temperature', FloatRange, unit='degC')
|
target_min = Parameter('lowest target temperature', FloatRange, unit='degC')
|
||||||
target_max = Parameter('maximum target temperature', FloatRange, unit='degC')
|
target_max = Parameter('maximum target temperature', FloatRange, unit='degC')
|
||||||
|
|
||||||
@ -90,44 +82,73 @@ class TemperatureLoopTC1(SensorTC1, Drivable):
|
|||||||
return self.get_param('MT')
|
return self.get_param('MT')
|
||||||
|
|
||||||
def read_status(self):
|
def read_status(self):
|
||||||
reply = self.get_param('IS') # instrument status
|
status = super().read_status()
|
||||||
if reply[0] == 1:
|
if status[0] == ERROR:
|
||||||
return self.ERROR_MAP[int(self.get_param('ER'))]
|
return status
|
||||||
|
reply = self.get_param('IS') # instrument status
|
||||||
|
if len(reply) < 5:
|
||||||
|
self.set_param('IS', 'E+')
|
||||||
|
reply = self.get_param('IS') # instrument status
|
||||||
self.control = reply[2] == '+'
|
self.control = reply[2] == '+'
|
||||||
|
if reply[4] == '+':
|
||||||
|
return BUSY, 'ramping'
|
||||||
if reply[3] == 'C':
|
if reply[3] == 'C':
|
||||||
|
if self.ramp_used:
|
||||||
|
return BUSY, 'stabilizing'
|
||||||
return BUSY, 'changing'
|
return BUSY, 'changing'
|
||||||
return IDLE, ''
|
return IDLE, ''
|
||||||
|
|
||||||
def write_target(self, target):
|
def write_target(self, target):
|
||||||
self.write_control(True)
|
if self.ramp_used:
|
||||||
return self.set_param('TT S', target)
|
self.set_param('RR S', self.ramp)
|
||||||
|
else:
|
||||||
|
self.set_param('RR S', 0)
|
||||||
|
target = self.set_param('TT S', target)
|
||||||
|
self.set_param('TC', '+')
|
||||||
|
self.read_status()
|
||||||
|
return target
|
||||||
|
|
||||||
def read_target(self):
|
def read_target(self):
|
||||||
return self.get_param('TT')
|
return self.get_param('TT')
|
||||||
|
|
||||||
def write_control(self, control):
|
def write_control(self, control):
|
||||||
sign = '-+'[control]
|
if control:
|
||||||
return self.set_param('TC', sign)
|
if not self.read_control():
|
||||||
|
self.write_target(self.value)
|
||||||
|
return True
|
||||||
|
self.set_param('TC', '-')
|
||||||
|
return False
|
||||||
|
|
||||||
def read_ramp(self):
|
def read_ramp(self):
|
||||||
reply = self.get_param('RR')
|
return float(self.get_param('RR'))
|
||||||
if reply == 'W':
|
|
||||||
return 'waiting'
|
def write_ramp(self, ramp):
|
||||||
try:
|
ramp = max(0.01, abs(ramp))
|
||||||
return float(reply)
|
self.ramp_used = True
|
||||||
except ValueError:
|
ramp = self.set_param('RR S', ramp)
|
||||||
return reply
|
if self.control:
|
||||||
|
self.ramp = ramp
|
||||||
|
self.write_target(self.target)
|
||||||
|
return Done
|
||||||
|
return ramp
|
||||||
|
|
||||||
|
def write_ramp_used(self, value):
|
||||||
|
if self.control:
|
||||||
|
self.ramp_used = value
|
||||||
|
self.write_target(self.target)
|
||||||
|
return Done
|
||||||
|
return value
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self.control and self.ramp_used:
|
||||||
|
self.write_target(self.value)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def write_ramp(self, rate):
|
|
||||||
return self.set_param('RR S', rate)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# def write_target(self, target):
|
|
||||||
# target_ = self.communicate(f'[F1 TT S {target}')
|
|
||||||
# T_high_lim = float(self.communicate(f'[F1 MT ?'))
|
|
||||||
# T_low_lim = float(self.communicate(f'[F1 LT ?'))
|
|
||||||
# if T_low_lim < target_ < T_high_lim:
|
|
||||||
# return self.communicate(f'[F1 TC +')
|
|
||||||
# return 'Error'
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user