# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: Oksana Shliakhtun # ***************************************************************************** """Thermo Haake Phoenix P1 Bath Circulator""" import re import time from frappy.core import StringIO, HasIO, Parameter, FloatRange, BoolType, \ EnumType, ERROR, IDLE, WARN, BUSY, Drivable, Command from frappy_psi.convergence import HasConvergence from frappy.errors import CommunicationFailedError def convert(string): """ Converts reply to a number :param string: reply from the command :return: number """ number = re.sub(r'[^0-9.-]', '', string) return float(number) class HaakeIO(StringIO): end_of_line = b'\r' identification = [('V', r'')] # version of the software class TemperatureLoop(HasIO, HasConvergence, Drivable): ioClass = HaakeIO value = Parameter('temperature', unit='degC') target = Parameter('target', datatype=FloatRange(-50, 200), unit='degC', readonly=False) setpoint = Parameter('setpoint', datatype=FloatRange, unit='degC') control_active = Parameter('control on/off', BoolType, readonly=False, default=False) mode = Parameter('internal/external control', EnumType(int=1, ext=2), readonly=False, default=1) status_messages = [ (ERROR, 'Alarm overtemperature', 3, 1), (ERROR, 'Alarm liquid level', 4, 1), (ERROR, 'Alarm motor and pump overloading', 5, 1), (ERROR, 'Alarm via external connection', 6, 1), (ERROR, 'Alarm cooling', 7, 1), (ERROR, 'Alarm Fuzzy control', 8, 1), (ERROR, 'Alarm internal Pt100', 10, 1), (ERROR, 'Alarm external Pt100', 11, 1), (WARN, 'Main relay missing', 2, 1), (IDLE, 'Temperature control is OFF', 0, 0), (IDLE, 'Temperature control is on', 0, 1), ] def get_values_status(self): """ Supplementary command for the operating status method. Removes the extra symbol and converts each status value into integer. :return: array of integers """ reply = self.communicate('B') string = reply.rstrip('$') return [int(val) for val in string] def read_status(self): # control_active update """ Operating status. :return: statu type and message """ values_str = self.get_values_status() self.read_control_active() for status_type, status_msg, position, value in self.status_messages: if values_str[position] == value: # error first conv_status = HasConvergence.read_status(self) if self.isBusy(conv_status): return BUSY, conv_status[1] if 'tolerance' in conv_status[1] else status_msg return status_type, status_msg # error - needs to reset return IDLE, '' def read_value(self): """ F1 - internal temperature, F2 - external temperature :return: float temperature value """ if self.mode == 1: value = self.communicate('F1') else: value = self.communicate('F2') return convert(value) def write_control_active(self, value): """ Turning on/off the heating, pump and regulation :param value: 0 is OFF, 1 is ON :return: """ if value is True: self.communicate('GO') # heating and pump run self.communicate('W SR') # regulation else: self.communicate('ST') # heating and pump stop self.communicate('W ER') return value def read_control_active(self): values_str = self.get_values_status() if values_str[0] == 1: return True else: return False def read_setpoint(self): string = self.communicate('S') return convert(string) def write_target(self, target): """ Selecting Celsius, setting the target :param target: target :return: target """ self.write_control_active(True) self.read_status() self.communicate('W TE C') self.communicate(f'W SW {target}') return target def write_mode(self, mode): """ Switching to internal or external control :param mode: internal/external :return: selected mode """ if mode == 1: self.communicate('W IN') self.communicate('W EX') return mode @Command def clear_errors(self): """ Reset after error. Otherwise the status will not be updated""" if self.read_status()[0] == ERROR: try: self.communicate('ER') except CommunicationFailedError: time.sleep(2) # wait for reset to be complete