345 lines
9.6 KiB
Python
345 lines
9.6 KiB
Python
import numpy as np
|
|
import serial
|
|
import serial.tools.list_ports
|
|
|
|
|
|
class ModuleTestBox:
|
|
|
|
def __init__(self, port=None, hints=None, verbose=False):
|
|
self.ser = None # port handle
|
|
self.port = port # port name COMn
|
|
self.portdesc = ''
|
|
self.hints = hints
|
|
self.verbose = verbose
|
|
self.portlist = {} # port list { port: description }
|
|
self.UpdateComPortList()
|
|
|
|
def __enter__(self):
|
|
self._Connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self._Disconnect()
|
|
if exc_type is not None:
|
|
print(f"Exception {exc_type}') # occurred with value {exc_value}")
|
|
# except serial.SerialException:
|
|
return True
|
|
|
|
# Update the serial port list
|
|
def UpdateComPortList(self):
|
|
ports = serial.tools.list_ports.comports()
|
|
self.portlist = {}
|
|
if self.verbose: print('* serial ports found:')
|
|
for port, desc, hwid in sorted(ports):
|
|
self.portlist[port] = hwid
|
|
if self.verbose: print(f'{port:7s}: {desc}, {hwid}')
|
|
|
|
# Finds a specific serial port based on a keyword (hint) in the description
|
|
def FindComPort(self, hints):
|
|
for port, desc in self.portlist.items():
|
|
for hint in hints:
|
|
if hint in desc:
|
|
self.port = port
|
|
self.portdesc = desc
|
|
return port
|
|
self.port = None
|
|
return None
|
|
|
|
# Checks if there is a connection to a serial port
|
|
def IsConnected(self):
|
|
return self.ser is not None
|
|
|
|
# Connect to a serial port by specific port or a hint in the description
|
|
def _Connect(self):
|
|
if self.hints is not None:
|
|
self.FindComPort(self.hints)
|
|
if self.port is not None:
|
|
if self.verbose: print(f'* open {self.port:s} ({self.portdesc})')
|
|
self.ser = serial.Serial(self.port, 115200, timeout=0.5)
|
|
|
|
def _Disconnect(self):
|
|
if self.IsConnected():
|
|
self.ser.close()
|
|
|
|
def TestConnection(self):
|
|
testdata = [0x5A, 0xA5]
|
|
# Test connection PC <-> HV box
|
|
for x in testdata:
|
|
y = self.Echo(x)
|
|
if y != x:
|
|
print(f'Communication failed: {y:02X} should be {x:02X}')
|
|
return False
|
|
# Test ADC register read/write (repeat for all ADCs)
|
|
adc_ok = True
|
|
for channel in range(1, 9):
|
|
self.SelectChannel(channel)
|
|
for x in testdata:
|
|
self.ADC_RegisterWrite(0x13, x)
|
|
y = self.ADC_RegisterRead(0x13)
|
|
if y != x:
|
|
print(f'ADC Register IO failed: {y:02X} should be {x:02X}')
|
|
adc_ok = False
|
|
if self.verbose:
|
|
adc = self.GetADCversion()
|
|
print(f'CH{channel}: ADC TYPE:{adc["ver"]} REV:{adc["rev"]} ID:{adc["id"]}')
|
|
return adc_ok
|
|
|
|
def _PutStr(self, s):
|
|
self.ser.write(s.encode('ASCII'))
|
|
|
|
def _PutINT8(self, x):
|
|
self.ser.write(x.to_bytes(1, signed=True, byteorder='little'))
|
|
|
|
def _PutINT16(self, x):
|
|
self.ser.write(x.to_bytes(2, signed=True, byteorder='little'))
|
|
|
|
def _PutINT32(self, x):
|
|
self.ser.write(x.to_bytes(4, signed=True, byteorder='little'))
|
|
|
|
def _PutUINT8(self, x):
|
|
self.ser.write(x.to_bytes(1, signed=False, byteorder='little'))
|
|
|
|
def _PutUINT16(self, x):
|
|
self.ser.write(x.to_bytes(2, signed=False, byteorder='little'))
|
|
|
|
def _PutUINT32(self, x):
|
|
self.ser.write(x.to_bytes(4, signed=False, byteorder='little'))
|
|
|
|
def _GetUINT8(self):
|
|
bv = self.ser.read(size=1)
|
|
return int.from_bytes(bv, signed=False, byteorder='little')
|
|
|
|
def _GetUINT16(self):
|
|
bv = self.ser.read(size=2)
|
|
return int.from_bytes(bv, signed=False, byteorder='little')
|
|
|
|
def _GetUINT32(self):
|
|
bv = self.ser.read(size=4)
|
|
return int.from_bytes(bv, signed=False, byteorder='little')
|
|
|
|
def _GetINT32(self):
|
|
bv = self.ser.read(size=4)
|
|
return int.from_bytes(bv, signed=True, byteorder='little')
|
|
|
|
def _Get2INT32(self):
|
|
bv = self.ser.read(size=8)
|
|
i = int.from_bytes(bv[0:4], signed=True, byteorder='little')
|
|
u = int.from_bytes(bv[4:8], signed=True, byteorder='little')
|
|
return i, u
|
|
|
|
|
|
def _CommandReq(self, command):
|
|
self.ser.write(command.encode('ASCII'))
|
|
|
|
def _CommandRsp(self, retform):
|
|
n = sum(retform)
|
|
bv = self.ser.read(size=n)
|
|
# split return bytes value
|
|
pos = 0
|
|
ret = []
|
|
for size in retform:
|
|
ret.append(int.from_bytes(bv[pos:pos+size], signed=False, byteorder='little'))
|
|
pos += size
|
|
return ret
|
|
|
|
def _Request(self, command, retform):
|
|
self._CommandReq(command)
|
|
return self._CommandRsp(retform)
|
|
|
|
|
|
def GetFirmwareVersion(self):
|
|
self._CommandReq('S0')
|
|
n = self._GetUINT8()
|
|
d = [self._GetUINT8() for _ in range(n)]
|
|
return ''.join(chr(i) for i in d)
|
|
|
|
def GetBoardID(self):
|
|
self._CommandReq('SI')
|
|
return self._GetUINT32()
|
|
|
|
def SetBoardID(self, id):
|
|
self._CommandReq('SD')
|
|
self._PutUINT32(id)
|
|
|
|
def SelectChannel(self, n):
|
|
if n-1 in range(8):
|
|
self._CommandReq('12345678'[n-1])
|
|
|
|
def GetI(self):
|
|
self._CommandReq('I')
|
|
return self._GetINT32()
|
|
|
|
def GetV(self):
|
|
self._CommandReq('V')
|
|
return self._GetINT32()
|
|
|
|
def GetIV(self):
|
|
self._CommandReq('D')
|
|
return self._GetINT32(), self._GetINT32()
|
|
|
|
def ScanI(self, fs, n):
|
|
self._CommandReq('MI')
|
|
self._PutUINT8(fs)
|
|
self._PutUINT16(n)
|
|
N = self._GetUINT16()
|
|
|
|
i = np.zeros(N, dtype=int)
|
|
for k in range(N):
|
|
i[k] = self._GetINT32()
|
|
return i
|
|
|
|
def ScanV(self, fs, n):
|
|
self._CommandReq('MV')
|
|
self._PutUINT8(fs)
|
|
self._PutUINT16(n)
|
|
N = self._GetUINT16()
|
|
|
|
u = np.zeros(N, dtype=int)
|
|
for k in range(N):
|
|
u[k] = self._GetINT32()
|
|
return u
|
|
|
|
def ScanIV(self, fs, n):
|
|
self._CommandReq('MD')
|
|
self._PutUINT8(fs)
|
|
self._PutUINT16(n)
|
|
N = self._GetUINT16()
|
|
|
|
i = np.zeros(N, dtype=int)
|
|
u = np.zeros(N, dtype=int)
|
|
for k in range(N):
|
|
i[k], u[k] = self._Get2INT32()
|
|
return i, u
|
|
|
|
def GetError(self):
|
|
status = self._Request('SE', (2,))[0]
|
|
if status == 0: return 'Status Ok'
|
|
return f'Error {status:04X}'
|
|
|
|
def GetADCversion(self):
|
|
self._CommandReq('SV')
|
|
b = self.ser.read(size=8)
|
|
s = ('ADE9113', 'ADE9112', 'ADE9103')[b[1]] if b[1] <= 3 else '?'
|
|
if len(b) >= 8:
|
|
id = f'{b[2]:02X}{b[3]:02X}{b[4]:02X}{b[5]:02X}{b[6]:02X}{b[7]:02X}'
|
|
else:
|
|
id = '????'
|
|
|
|
adc = {'ver':s, 'rev':f'{b[0]:02X}', 'id':id}
|
|
return adc
|
|
|
|
|
|
|
|
def SetRawData(self, unit=False):
|
|
self._CommandReq('SU')
|
|
self._PutUINT8(1 if unit else 0)
|
|
|
|
def SetMeanCount(self, n):
|
|
self._CommandReq('SN')
|
|
self._PutUINT8(n)
|
|
|
|
def AdjustOffsetI(self):
|
|
self._CommandReq('SCI')
|
|
return self._GetINT32()
|
|
|
|
def AdjustOffsetV(self):
|
|
self._CommandReq('SCV')
|
|
return self._GetINT32()
|
|
|
|
def AdjustScaleI(self, value):
|
|
self._CommandReq('SAI')
|
|
self._PutINT32(value)
|
|
|
|
def AdjustScaleV(self, value):
|
|
self._CommandReq('SAV')
|
|
self._PutINT32(value)
|
|
|
|
def ADC_Transmit(self, txdata):
|
|
self._CommandReq('X1')
|
|
self._PutUINT32(txdata)
|
|
|
|
def ADC_Receive(self):
|
|
self._CommandReq('X2')
|
|
return self._GetUINT32()
|
|
|
|
def ADC_Transfer(self, txdata):
|
|
self._CommandReq('XT')
|
|
self._PutUINT32(txdata)
|
|
return self._GetUINT32()
|
|
|
|
def ADC_RegisterWrite(self, reg, value):
|
|
self._CommandReq('XW')
|
|
self._PutUINT8(reg)
|
|
self._PutUINT8(value)
|
|
|
|
def ADC_RegisterRead(self, reg):
|
|
self._CommandReq('XR')
|
|
self._PutUINT8(reg)
|
|
return self._CommandRsp((1,))[0]
|
|
|
|
def ADC_RegisterRead2(self, reg):
|
|
self._CommandReq('Xr')
|
|
self._PutUINT8(reg)
|
|
return self._CommandRsp((1,1))
|
|
|
|
def ReadADC_I(self):
|
|
self._CommandReq('XI')
|
|
return self._GetINT32()
|
|
|
|
def ReadADC_V(self):
|
|
self._CommandReq('XV')
|
|
return self._GetINT32()
|
|
|
|
def ReadADC_IV(self):
|
|
self._CommandReq('XD')
|
|
return self._Get2INT32()
|
|
|
|
def ADC_Init(self):
|
|
self._CommandReq('XC')
|
|
|
|
def ADC_Sleep(self, sleep=True):
|
|
self._CommandReq('XS')
|
|
self._PutINT8(1 if sleep else 0)
|
|
|
|
def AddCRC(self, d):
|
|
self._CommandReq('XX')
|
|
self._PutUINT32(d)
|
|
return self._CommandRsp((4,))[0]
|
|
|
|
def Echo(self, din):
|
|
self._CommandReq('XE')
|
|
self._PutUINT8(din)
|
|
return self._CommandRsp((1,))[0]
|
|
|
|
def Bias_Enable(self, on):
|
|
self._CommandReq('BC')
|
|
self._PutINT8(1 if on else 0)
|
|
|
|
def Bias_GetAll(self):
|
|
self._CommandReq('BS')
|
|
return self._CommandRsp((1,))[0]
|
|
|
|
def Bias_SetV(self, v=0.0):
|
|
self._CommandReq('BV')
|
|
self._PutINT16(int(v*1000))
|
|
|
|
def Bias_GetI(self):
|
|
self._CommandReq('BI')
|
|
return self._GetINT32()/1000000000.0
|
|
|
|
def HV_Enable(self, on):
|
|
self._CommandReq('LC')
|
|
self._PutINT8(1 if on else 0)
|
|
|
|
def HV_SetAll(self, on):
|
|
self._CommandReq('LA')
|
|
self._PutINT8(1 if on else 0)
|
|
|
|
def HV_GetAll(self):
|
|
self._CommandReq('LS')
|
|
return self._CommandRsp((1,))[0]
|
|
|
|
def HV_IsLocked(self):
|
|
self._CommandReq('SL')
|
|
return self._CommandRsp((1,))[0] != 0
|