frappy/frappy_psi/thermofisher.py
Oksana Shliakhtun 2d628e151c finished thermofisher
but: convergence does not work yet properly
Change-Id: I834f8368730c347ba9f08a03eceae1a60fc66f90
2023-04-26 10:23:48 +02:00

165 lines
5.6 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# *****************************************************************************
from frappy.core import StringIO, Parameter, Readable, HasIO, \
Drivable, FloatRange, IDLE, ERROR, WARN, BoolType
from frappy_psi.convergence import HasConvergence
class ThermFishIO(StringIO):
end_of_line = '\r'
identification = [('RVER', r'.*')] # Firmware Version
class SensorA10(HasIO, Readable):
ioClass = ThermFishIO
value = Parameter('internal temperature', unit='degC')
def get_par(self, cmd):
new_cmd = 'R' + cmd
reply = self.communicate(new_cmd)
if any(unit.isalpha() for unit in reply):
reply = ''.join(unit for unit in reply if not unit.isalpha())
return float(reply)
# def set_par(self, cmd, arg):
# new_cmd = 'S' + cmd.format(arg=arg)
# return self.communicate(new_cmd)
# # return self.get_par(cmd)
def read_value(self):
return self.get_par('T')
def read_status(self):
result_str = self.communicate('RUFS')
values_str = result_str.strip().split()
values_int = [int(val) for val in values_str]
v1, v2, v3, v4, v5 = values_int[:5]
status_messages = [
(ERROR, 'high tempr. cutout fault', v2, 0),
(ERROR, 'high RA tempr. fault', v2, 1),
(ERROR, 'high temperature fixed fault', v3, 7),
(ERROR, 'low temperature fixed fault', v3, 6),
(ERROR, 'high temperature fault', v3, 5),
(ERROR, 'low temperature fault', v3, 4),
(ERROR, 'low level fault', v3, 3),
(ERROR, 'circulator fault', v4, 5),
(ERROR, 'high press. cutout', v5, 2),
(ERROR, 'motor overloaded', v5, 1),
(ERROR, 'pump speed fault', v5, 0),
(WARN, 'open internal sensor', v1, 7),
(WARN, 'shorted internal sensor', v1, 6),
(WARN, 'high temperature warn', v3, 2),
(WARN, 'low temperature warn', v3, 1),
(WARN, 'low level warn', v3, 0),
(IDLE, 'max. heating', v5, 5),
(IDLE, 'heating', v5, 6),
(IDLE, 'cooling', v5, 4),
(IDLE, 'max cooling', v5, 3),
(IDLE, '', v4, 3),
]
for status_type, status_msg, vi,bit in status_messages:
if vi & (1 << bit):
return status_type, status_msg
return WARN, 'circulation off'
class TemperatureLoopA10(HasConvergence, SensorA10, Drivable):
value = Parameter('temperature', unit='degC')
target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0)
circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False)
# pids
p_heat = Parameter('proportional heat parameter', FloatRange(), readonly=False)
i_heat = Parameter('integral heat parameter', FloatRange(), readonly=False)
d_heat = Parameter('derivative heat parameter', FloatRange(), readonly=False)
p_cool = Parameter('proportional cool parameter', FloatRange(), readonly=False)
i_cool = Parameter('integral cool parameter', FloatRange(), readonly=False)
d_cool = Parameter('derivative cool parameter', FloatRange(), readonly=False)
def read_circ_on(self):
return self.communicate('RO')
def write_circ_on(self, circ_on):
circ_on_str = '1' if circ_on else '0'
self.communicate(f'SO {circ_on_str}')
return self.read_circ_on()
def read_target(self):
return self.get_par('S')
def write_target(self, target):
self.write_circ_on('1')
self.communicate(f'SS {target}')
self.start_state()
return target
## heat PID
def read_p_heat(self):
p_heat = self.get_par('PH')
return float(p_heat)
def write_p_heat(self, p_heat):
self.communicate(f'SPH {p_heat}')
return p_heat
def read_i_heat(self):
i_heat = self.get_par('IH')
return float(i_heat)
def write_i_heat(self, i_heat):
self.communicate(f'SIH {i_heat}')
return i_heat
def read_d_heat(self):
d_heat = self.get_par('DH')
return float(d_heat)
def write_d_heat(self, d_heat):
self.communicate(f'SDH {d_heat}')
return d_heat
## cool PID
def read_p_cool(self):
p_cool = self.get_par('PC')
return float(p_cool)
def write_p_cool(self, p_cool):
self.communicate(f'SPC {p_cool}')
return p_cool
def read_i_cool(self):
i_cool = self.get_par('IC')
return float(i_cool)
def write_i_cool(self, i_cool):
self.communicate(f'SIC {i_cool}')
return i_cool
def read_d_cool(self):
d_cool = self.get_par('DC')
return float(d_cool)
def write_d_cool(self, d_cool):
self.communicate(f'SDC {d_cool}')
return d_cool