# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Paul M. Neves # ***************************************************************************** import time from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, IntRange,\ IDLE, BUSY, WARN, ERROR, Drivable, BoolType, Attached, StructOf class ACM1219IO(StringIO): """communication with ACM1219""" end_of_line = ('\r\n', '\r') # ('\n', '\r') ('\r\n', '\r') encoding = 'latin-1' # initial reply might not be ascii for a strange reason identification = [('*IDN?', r'.*,ACM1219,.*')] def checkHWIdent(self): for _ in range(3): time.sleep(0.5) try: self.communicate('*IDN?') break except Exception: pass super().checkHWIdent() class Channel(Readable): value = Parameter('one channel', unit='pF') class OneChannel(HasIO, Readable): """read both capacitance channels in multiplex mode""" # define the communication class for automatic creation of the IO module ioClass = ACM1219IO # modifying a property of inherited parameters (unit is propagated to the FloatRange datatype) value = Parameter('Capacitance of one channel', FloatRange(0, 21.096, unit='pF'), readonly=True) channel_enabled = Parameter('channel on or off', BoolType(), readonly=False) channel = Property('the voltage channel', datatype=IntRange(1,2)) _ch_enabled = False def read_value(self): reply = self.communicate('readCVT').split(',') value = float(reply[self.channel-1]) return value def read_status(self): # code = self.communicate(f'readStatus') # returns tons of data return IDLE, '' def read_channel_enabled(self): return self._ch_enabled def write_channel_enabled(self, channel_enabled): if channel_enabled: self.communicate(f'setCIN {self.channel},0,00.0,00.0,0,00.0,00.0') # self.communicate(f'setCIN 2,0,00.0,00.0,0,00.0,00.0') self._ch_enabled = True else: self.communicate(f'setCIN 0,0,00.0,00.0,0,00.0,00.0') self._ch_enabled = False return self.read_channel_enabled() class BothChannels(HasIO, Readable): """read both capacitance channels in multiplex mode""" # define the communication class for automatic creation of the IO module ioClass = ACM1219IO chan1 = Attached(Readable) chan2 = Attached(Readable) # modifying a property of inherited parameters (unit is propagated to the FloatRange datatype) value = Parameter('Capacitance 1 and 2, and VT', StructOf(C1=FloatRange(0, 21.096, unit='pF'), C2=FloatRange(0, 21.096, unit='pF'), VT=FloatRange(-1, 1000, unit='')), readonly=True) channels_enabled = Parameter('channels on or off', BoolType(), readonly=False) _ch_enabled = False _channel = 1 def Xread_value(self): # using the inherited HasIO.communicate method to send a command and get the reply natempt = 0 maxAttempts = 5 while natempt < maxAttempts: try: reply = self.communicate(f'readMUC') # print(reply) reply = reply.split(',') C1 = float(reply[0]) C2 = float(reply[1]) VT = float(reply[2]) self.chan1.value = C1 self.chan2.value = C2 return {'C1': C1, 'C2': C2, 'VT': VT} except: '' natempt+=1 if natempt >= maxAttempts: print('Max attempt reached for reading arduino.') return self.value def read_value(self): reply = self.communicate('readCVT').split(',') value = float(reply[self._channel-1]) previous = dict(self.value) previous[f'C{self._channel}'] = value getattr(self, f'chan{self._channel}').value = value self._channel = 3 - self._channel self.communicate(f'setCIN {self._channel},0,00.0,00.0,0,00.0,00.0') return previous def read_status(self): # code = self.communicate(f'readStatus') # returns tons of data return IDLE, '' def read_channels_enabled(self): return self._ch_enabled def write_channels_enabled(self, channels_enabled): if channels_enabled: self.communicate(f'setCIN {self._channel},0,00.0,00.0,0,00.0,00.0') # self.communicate(f'setCIN 2,0,00.0,00.0,0,00.0,00.0') self._ch_enabled = True self.communicate(f'setEXC 2,2,3,1') else: self.communicate(f'setCIN 0,0,00.0,00.0,0,00.0,00.0') self._ch_enabled = False self.communicate(f'setEXC 0,0,3,1') return self.read_channels_enabled()