frappy/frappy_psi/haake.py
Oksana Shliakhtun f0eb7d95f1 Added documentation
Change-Id: Id6e26a4c28fe080a55099cd54d0fa85c15946657
2024-08-27 15:15:01 +02:00

160 lines
5.4 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors: Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# *****************************************************************************
"""Thermo Haake Phoenix P1 Bath Circulator"""
import re
import time
from frappy.core import StringIO, HasIO, Parameter, FloatRange, BoolType, \
EnumType, ERROR, IDLE, WARN, BUSY, Drivable, Command
from frappy_psi.convergence import HasConvergence
from frappy.errors import CommunicationFailedError
def convert(string):
"""
Converts reply to a number
:param string: reply from the command
:return: number
"""
number = re.sub(r'[^0-9.-]', '', string)
return float(number)
class HaakeIO(StringIO):
end_of_line = b'\r'
identification = [('V', r'')] # version of the software
class TemperatureLoop(HasIO, HasConvergence, Drivable):
ioClass = HaakeIO
value = Parameter('temperature', unit='degC')
target = Parameter('target', datatype=FloatRange(-50, 200), unit='degC', readonly=False)
setpoint = Parameter('setpoint', datatype=FloatRange, unit='degC')
control_active = Parameter('control on/off', BoolType, readonly=False, default=False)
mode = Parameter('internal/external control', EnumType(int=1, ext=2), readonly=False, default=1)
status_messages = [
(ERROR, 'Alarm overtemperature', 3, 1),
(ERROR, 'Alarm liquid level', 4, 1),
(ERROR, 'Alarm motor and pump overloading', 5, 1),
(ERROR, 'Alarm via external connection', 6, 1),
(ERROR, 'Alarm cooling', 7, 1),
(ERROR, 'Alarm Fuzzy control', 8, 1),
(ERROR, 'Alarm internal Pt100', 10, 1),
(ERROR, 'Alarm external Pt100', 11, 1),
(WARN, 'Main relay missing', 2, 1),
(IDLE, 'Temperature control is OFF', 0, 0),
(IDLE, 'Temperature control is on', 0, 1),
]
def get_values_status(self):
"""
Supplementary command for the operating status method.
Removes the extra symbol and converts each status value into integer.
:return: array of integers
"""
reply = self.communicate('B')
string = reply.rstrip('$')
return [int(val) for val in string]
def read_status(self): # control_active update
"""
Operating status.
:return: statu type and message
"""
values_str = self.get_values_status()
self.read_control_active()
for status_type, status_msg, position, value in self.status_messages:
if values_str[position] == value: # error first
conv_status = HasConvergence.read_status(self)
if self.isBusy(conv_status):
return BUSY, conv_status[1] if 'tolerance' in conv_status[1] else status_msg
return status_type, status_msg # error - needs to reset
return IDLE, ''
def read_value(self):
"""
F1 - internal temperature, F2 - external temperature
:return: float temperature value
"""
if self.mode == 1:
value = self.communicate('F1')
else:
value = self.communicate('F2')
return convert(value)
def write_control_active(self, value):
"""
Turning on/off the heating, pump and regulation
:param value: 0 is OFF, 1 is ON
:return:
"""
if value is True:
self.communicate('GO') # heating and pump run
self.communicate('W SR') # regulation
else:
self.communicate('ST') # heating and pump stop
self.communicate('W ER')
return value
def read_control_active(self):
values_str = self.get_values_status()
if values_str[0] == 1:
return True
else:
return False
def read_setpoint(self):
string = self.communicate('S')
return convert(string)
def write_target(self, target):
"""
Selecting Celsius, setting the target
:param target: target
:return: target
"""
self.write_control_active(True)
self.read_status()
self.communicate('W TE C')
self.communicate(f'W SW {target}')
return target
def write_mode(self, mode):
"""
Switching to internal or external control
:param mode: internal/external
:return: selected mode
"""
if mode == 1:
self.communicate('W IN')
self.communicate('W EX')
return mode
@Command
def clear_errors(self):
""" Reset after error"""
if self.read_status()[0] == ERROR:
try:
self.communicate('ER')
except CommunicationFailedError:
time.sleep(2) # wait for reset to be complete