153 lines
5.6 KiB
Python
153 lines
5.6 KiB
Python
# *****************************************************************************
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Paul M. Neves <pmneves@mit.edu>
|
|
# *****************************************************************************
|
|
|
|
import time
|
|
from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, IntRange,\
|
|
IDLE, BUSY, WARN, ERROR, Drivable, BoolType, Attached, StructOf
|
|
|
|
|
|
class ACM1219IO(StringIO):
|
|
"""communication with ACM1219"""
|
|
end_of_line = ('\r\n', '\r') # ('\n', '\r') ('\r\n', '\r')
|
|
encoding = 'latin-1' # initial reply might not be ascii for a strange reason
|
|
identification = [('*IDN?', r'.*,ACM1219,.*')]
|
|
|
|
def checkHWIdent(self):
|
|
for _ in range(3):
|
|
time.sleep(0.5)
|
|
try:
|
|
self.communicate('*IDN?')
|
|
break
|
|
except Exception:
|
|
pass
|
|
super().checkHWIdent()
|
|
|
|
|
|
class Channel(Readable):
|
|
value = Parameter('one channel', unit='pF')
|
|
|
|
|
|
class OneChannel(HasIO, Readable):
|
|
"""read both capacitance channels in multiplex mode"""
|
|
|
|
# define the communication class for automatic creation of the IO module
|
|
ioClass = ACM1219IO
|
|
|
|
# modifying a property of inherited parameters (unit is propagated to the FloatRange datatype)
|
|
value = Parameter('Capacitance of one channel',
|
|
FloatRange(0, 21.096, unit='pF'), readonly=True)
|
|
channel_enabled = Parameter('channel on or off', BoolType(), readonly=False)
|
|
channel = Property('the voltage channel', datatype=IntRange(1,2))
|
|
_ch_enabled = False
|
|
|
|
def read_value(self):
|
|
reply = self.communicate('readCVT').split(',')
|
|
value = float(reply[self.channel-1])
|
|
return value
|
|
|
|
def read_status(self):
|
|
# code = self.communicate(f'readStatus') # returns tons of data
|
|
return IDLE, ''
|
|
|
|
def read_channel_enabled(self):
|
|
return self._ch_enabled
|
|
|
|
def write_channel_enabled(self, channel_enabled):
|
|
if channel_enabled:
|
|
self.communicate(f'setCIN {self.channel},0,00.0,00.0,0,00.0,00.0')
|
|
# self.communicate(f'setCIN 2,0,00.0,00.0,0,00.0,00.0')
|
|
self._ch_enabled = True
|
|
else:
|
|
self.communicate(f'setCIN 0,0,00.0,00.0,0,00.0,00.0')
|
|
self._ch_enabled = False
|
|
|
|
return self.read_channel_enabled()
|
|
|
|
|
|
class BothChannels(HasIO, Readable):
|
|
"""read both capacitance channels in multiplex mode"""
|
|
|
|
# define the communication class for automatic creation of the IO module
|
|
ioClass = ACM1219IO
|
|
chan1 = Attached(Readable)
|
|
chan2 = Attached(Readable)
|
|
|
|
# modifying a property of inherited parameters (unit is propagated to the FloatRange datatype)
|
|
value = Parameter('Capacitance 1 and 2, and VT',
|
|
StructOf(C1=FloatRange(0, 21.096, unit='pF'), C2=FloatRange(0, 21.096, unit='pF'), VT=FloatRange(-1, 1000, unit='')),
|
|
readonly=True)
|
|
channels_enabled = Parameter('channels on or off', BoolType(), readonly=False)
|
|
_ch_enabled = False
|
|
_channel = 1
|
|
|
|
def Xread_value(self):
|
|
# using the inherited HasIO.communicate method to send a command and get the reply
|
|
natempt = 0
|
|
maxAttempts = 5
|
|
|
|
while natempt < maxAttempts:
|
|
try:
|
|
reply = self.communicate(f'readMUC')
|
|
# print(reply)
|
|
reply = reply.split(',')
|
|
C1 = float(reply[0])
|
|
C2 = float(reply[1])
|
|
VT = float(reply[2])
|
|
self.chan1.value = C1
|
|
self.chan2.value = C2
|
|
return {'C1': C1, 'C2': C2, 'VT': VT}
|
|
except:
|
|
''
|
|
natempt+=1
|
|
if natempt >= maxAttempts:
|
|
print('Max attempt reached for reading arduino.')
|
|
return self.value
|
|
|
|
def read_value(self):
|
|
reply = self.communicate('readCVT').split(',')
|
|
value = float(reply[self._channel-1])
|
|
previous = dict(self.value)
|
|
previous[f'C{self._channel}'] = value
|
|
getattr(self, f'chan{self._channel}').value = value
|
|
self._channel = 3 - self._channel
|
|
self.communicate(f'setCIN {self._channel},0,00.0,00.0,0,00.0,00.0')
|
|
return previous
|
|
|
|
def read_status(self):
|
|
# code = self.communicate(f'readStatus') # returns tons of data
|
|
return IDLE, ''
|
|
|
|
def read_channels_enabled(self):
|
|
return self._ch_enabled
|
|
|
|
def write_channels_enabled(self, channels_enabled):
|
|
if channels_enabled:
|
|
self.communicate(f'setCIN {self._channel},0,00.0,00.0,0,00.0,00.0')
|
|
# self.communicate(f'setCIN 2,0,00.0,00.0,0,00.0,00.0')
|
|
self._ch_enabled = True
|
|
self.communicate(f'setEXC 2,2,3,1')
|
|
else:
|
|
self.communicate(f'setCIN 0,0,00.0,00.0,0,00.0,00.0')
|
|
self._ch_enabled = False
|
|
self.communicate(f'setEXC 0,0,3,1')
|
|
|
|
return self.read_channels_enabled()
|
|
|
|
|