#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: Oksana Shliakhtun # ***************************************************************************** import re import time from frappy.core import StringIO, HasIO, Parameter, FloatRange, BoolType, \ EnumType, ERROR, IDLE, WARN, BUSY, Drivable, Command from frappy_psi.convergence import HasConvergence from frappy.errors import CommunicationFailedError def convert(string): number = re.sub(r'[^0-9.-]', '', string) return float(number) class HaakeIO(StringIO): end_of_line = b'\r' identification = [('V', r'')] # version of the software class TemperatureLoop(HasIO, HasConvergence, Drivable): ioClass = HaakeIO value = Parameter('temperature', unit='degC') target = Parameter('target', datatype=FloatRange(-50, 200), unit='degC', readonly=False) setpoint = Parameter('setpoint', datatype=FloatRange, unit='degC') control_active = Parameter('control on/off', BoolType, readonly=False, default=False) mode = Parameter('internal/external control', EnumType(int=1, ext=2), readonly=False, default=1) status_messages = [ (ERROR, 'Alarm overtemperature', 3, 1), (ERROR, 'Alarm liquid level', 4, 1), (ERROR, 'Alarm motor and pump overloading', 5, 1), (ERROR, 'Alarm via external connection', 6, 1), (ERROR, 'Alarm cooling', 7, 1), (ERROR, 'Alarm Fuzzy control', 8, 1), (ERROR, 'Alarm internal Pt100', 10, 1), (ERROR, 'Alarm external Pt100', 11, 1), (WARN, 'Main relay missing', 2, 1), (IDLE, 'Temperature control is OFF', 0, 0), (IDLE, 'Temperature control is on', 0, 1), ] def get_values_status(self): reply = self.communicate('B') string = reply.rstrip('$') return [int(val) for val in string] def read_status(self): # control_active update values_str = self.get_values_status() self.read_control_active() for status_type, status_msg, position, value in self.status_messages: if values_str[position] == value: # error first conv_status = HasConvergence.read_status(self) if self.isBusy(conv_status): return BUSY, conv_status[1] if 'tolerance' in conv_status[1] else status_msg return status_type, status_msg # error - needs to reset return IDLE, '' def read_value(self): if self.mode == 1: value = self.communicate('F1') else: value = self.communicate('F2') return convert(value) def write_control_active(self, value): if value is True: self.communicate('GO') # heating and pump run self.communicate('W SR') # regulation else: self.communicate('ST') # heating and pump stop self.communicate('W ER') return value def read_control_active(self): values_str = self.get_values_status() if values_str[0] == 1: return True else: return False def read_setpoint(self): string = self.communicate('S') return convert(string) def write_target(self, target): self.write_control_active(True) self.read_status() self.communicate('W TE C') self.communicate(f'W SW {target}') return target def write_mode(self, mode): if mode == 1: self.communicate('W IN') self.communicate('W EX') return mode @Command def clear_errors(self): """ Reset after error""" if self.read_status()[0] == ERROR: try: self.communicate('ER') except CommunicationFailedError: time.sleep(2) # wait for reset to be complete