Files
frappy/frappy_psi/uniaxial_cell/psi_serial.py

118 lines
3.7 KiB
Python

'''
PSI class to interact with arduino
Read capacitance of FC100 cell
'''
import os
import time
import serial
class psiSerial(serial.Serial):
def __init__(self,**kwargs):
super(psiSerial,self).__init__(**kwargs)
# Default values
#self.port='COM8'
#self.baudrate=9600
#self.bytesize=8
#self.bitstop=1
#self.parity='N'
self.timeout=0.1 # Read timeout of 0.1s default
self.customConnectionError = b'DEVICE CONNECTION ERROR\r\n'
self.maxAttempts = 10
self.sleep_after_open = 0.1 #s
self.endOfLine = "\r"
self.writeWhileEmpty = False
def psi_open(self,**kwargs):
self.custom_is_open = False
while not self.custom_is_open:
try:
self.open()
except:
''
if self.is_open:
a=self.readline()
if a != self.customConnectionError:
self.custom_is_open = True
else:
self.close()
print('Serial port is open. Wait for',self.sleep_after_open,'seconds.')
time.sleep(self.sleep_after_open)
def psi_write(self,string,**kwargs):
'''
IN: string
OUT: serial.Serial.write(bstring)
where bstring is a string decorated by
"b'" +string+"\r\n".
Opt.: book write_debug: writes the input string and output
encoded string.
'''
try:
self.write_debug = kwargs['write_debug']
except:
self.write_debug = False
try:
response_code = kwargs['response_code']
except:
response_code = True
self.write_response = 0
natempt = 0
bstring = string+self.endOfLine
bstring=bstring.encode()
if response_code:
while self.write_response != 8 and natempt < self.maxAttempts:
try:
self.write_response=int(self.write(bstring))
if self.write_debug:
print('Input string:',string)
print('Output:',bstring)
print('write response:',self.write_response)
except:
''
natempt+=1
if natempt >= self.maxAttempts:
print('Max attempt reached for psi_write.')
else:
self.write_response=int(self.write(bstring))
if self.write_debug:
print('Input string:',string)
print('Output:',bstring)
print('write response:',self.write_response)
def psi_read(self,**kwargs):
'''
Function psi_read reads one byte.
'''
return self.read()
def psi_readline(self,**kwargs):
a = self.readline()
return a
def psi_write_readline(self,string,**kwargs):
self.psi_write(string,**kwargs)
a=self.psi_readline(**kwargs)
if a == self.customConnectionError:
print('Connection dropped. Restart.')
print('------------------------------')
self.psi_close()
self.psi_open()
self.psi_write_readline(string,**kwargs)
if a == b'' and self.writeWhileEmpty:
print('a = b"". Retry.')
print('------------------------------')
self.psi_close()
self.psi_open()
self.psi_write_readline(string,**kwargs)
return a
def psi_close(self,**kwargs):
self.close()
time.sleep(self.sleep_after_open)