# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """sensirion flow sensor, onnected via an Arduio Nano on a serial connection shared with the trinamic hepump valve motor """ import math from frappy.core import Parameter, Readable, IntRange, FloatRange, BoolType, BytesIO, HasIO from frappy.errors import ProgrammingError class FlowSensor(HasIO, Readable): value = Parameter(unit='ln/min') stddev = Parameter('std dev.', FloatRange(unit='ln/min')) nsamples = Parameter('number of samples for averaging', IntRange(1,1024), default=160) offset = Parameter('offset correction', FloatRange(unit='ln/min'), readonly=False, default=0) scale = Parameter('scale factor', FloatRange(), readonly=False, default=2.3) saved = Parameter('is the current value saved?', BoolType(), readonly=False) pollinterval = Parameter(default=0.2) ioClass = BytesIO _saved = None def command(self, cmd, nvalues=1): if len(cmd) == 1: # its a query command = f'{cmd}\n' else: if len(cmd) > 7: raise ProgrammingError('number does not fit into 6 characters') command = f'{cmd[0]}{cmd[1:].ljust(6)}\n' reply = self.io.communicate(command.encode('ascii'), max(1, nvalues * 9)) if nvalues == 1: return float(reply) if nvalues: return tuple(float(s) for s in reply.split()) return None def doPoll(self): flow, stddev = self.command('?', nvalues=2) stddev = stddev / math.sqrt(self.nsamples) if (flow, stddev) != (self.value, self.stddev): self.value, self.stddev = flow, stddev # TODO: treat status (e.g. when reading 0 always) def read_value(self): self.doPoll() return self.value def read_nsamples(self): return self.command('n') def write_nsamples(self, nsamples): return self.command(f'n{nsamples}') def read_offset(self): return self.command('o') def write_offset(self, offset): return self.command(f'o{offset:.2f}') def read_scale(self): return self.command('g') def write_scale(self, scale): return self.command(f'g{scale:.4f}') def read_saved(self): if self._saved is None: self._saved = self.read_scale(), self.read_offset() return True return self._saved == (self.scale, self.offset) def write_saved(self, target): if target: self.command('s', nvalues=0)