''' PSI class to interact with arduino Read capacitance of FC100 cell ''' import os import time import serial class psiSerial(serial.Serial): def __init__(self,**kwargs): super(psiSerial,self).__init__(**kwargs) # Default values #self.port='COM8' #self.baudrate=9600 #self.bytesize=8 #self.bitstop=1 #self.parity='N' self.timeout=0.1 # Read timeout of 0.1s default self.customConnectionError = b'DEVICE CONNECTION ERROR\r\n' self.maxAttempts = 10 self.sleep_after_open = 0.1 #s self.endOfLine = "\r" self.writeWhileEmpty = False def psi_open(self,**kwargs): self.custom_is_open = False while not self.custom_is_open: try: self.open() except: '' if self.is_open: a=self.readline() if a != self.customConnectionError: self.custom_is_open = True else: self.close() print('Serial port is open. Wait for',self.sleep_after_open,'seconds.') time.sleep(self.sleep_after_open) def psi_write(self,string,**kwargs): ''' IN: string OUT: serial.Serial.write(bstring) where bstring is a string decorated by "b'" +string+"\r\n". Opt.: book write_debug: writes the input string and output encoded string. ''' try: self.write_debug = kwargs['write_debug'] except: self.write_debug = False try: response_code = kwargs['response_code'] except: response_code = True self.write_response = 0 natempt = 0 bstring = string+self.endOfLine bstring=bstring.encode() if response_code: while self.write_response != 8 and natempt < self.maxAttempts: try: self.write_response=int(self.write(bstring)) if self.write_debug: print('Input string:',string) print('Output:',bstring) print('write response:',self.write_response) except: '' natempt+=1 if natempt >= self.maxAttempts: print('Max attempt reached for psi_write.') else: self.write_response=int(self.write(bstring)) if self.write_debug: print('Input string:',string) print('Output:',bstring) print('write response:',self.write_response) def psi_read(self,**kwargs): ''' Function psi_read reads one byte. ''' return self.read() def psi_readline(self,**kwargs): a = self.readline() return a def psi_write_readline(self,string,**kwargs): self.psi_write(string,**kwargs) a=self.psi_readline(**kwargs) if a == self.customConnectionError: print('Connection dropped. Restart.') print('------------------------------') self.psi_close() self.psi_open() self.psi_write_readline(string,**kwargs) if a == b'' and self.writeWhileEmpty: print('a = b"". Retry.') print('------------------------------') self.psi_close() self.psi_open() self.psi_write_readline(string,**kwargs) return a def psi_close(self,**kwargs): self.close() time.sleep(self.sleep_after_open)