Files
HVDistributionBox/ModuleTestBox.py
2025-06-02 09:32:52 +02:00

345 lines
9.6 KiB
Python

import numpy as np
import serial
import serial.tools.list_ports
class ModuleTestBox:
def __init__(self, port=None, hints=None, verbose=False):
self.ser = None # port handle
self.port = port # port name COMn
self.portdesc = ''
self.hints = hints
self.verbose = verbose
self.portlist = {} # port list { port: description }
self.UpdateComPortList()
def __enter__(self):
self._Connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self._Disconnect()
if exc_type is not None:
print(f"Exception {exc_type}') # occurred with value {exc_value}")
# except serial.SerialException:
return True
# Update the serial port list
def UpdateComPortList(self):
ports = serial.tools.list_ports.comports()
self.portlist = {}
if self.verbose: print('* serial ports found:')
for port, desc, hwid in sorted(ports):
self.portlist[port] = hwid
if self.verbose: print(f'{port:7s}: {desc}, {hwid}')
# Finds a specific serial port based on a keyword (hint) in the description
def FindComPort(self, hints):
for port, desc in self.portlist.items():
for hint in hints:
if hint in desc:
self.port = port
self.portdesc = desc
return port
self.port = None
return None
# Checks if there is a connection to a serial port
def IsConnected(self):
return self.ser is not None
# Connect to a serial port by specific port or a hint in the description
def _Connect(self):
if self.hints is not None:
self.FindComPort(self.hints)
if self.port is not None:
if self.verbose: print(f'* open {self.port:s} ({self.portdesc})')
self.ser = serial.Serial(self.port, 115200, timeout=0.5)
def _Disconnect(self):
if self.IsConnected():
self.ser.close()
def TestConnection(self):
testdata = [0x5A, 0xA5]
# Test connection PC <-> HV box
for x in testdata:
y = self.Echo(x)
if y != x:
print(f'Communication failed: {y:02X} should be {x:02X}')
return False
# Test ADC register read/write (repeat for all ADCs)
adc_ok = True
for channel in range(1, 9):
self.SelectChannel(channel)
for x in testdata:
self.ADC_RegisterWrite(0x13, x)
y = self.ADC_RegisterRead(0x13)
if y != x:
print(f'ADC Register IO failed: {y:02X} should be {x:02X}')
adc_ok = False
if self.verbose:
adc = self.GetADCversion()
print(f'CH{channel}: ADC TYPE:{adc["ver"]} REV:{adc["rev"]} ID:{adc["id"]}')
return adc_ok
def _PutStr(self, s):
self.ser.write(s.encode('ASCII'))
def _PutINT8(self, x):
self.ser.write(x.to_bytes(1, signed=True, byteorder='little'))
def _PutINT16(self, x):
self.ser.write(x.to_bytes(2, signed=True, byteorder='little'))
def _PutINT32(self, x):
self.ser.write(x.to_bytes(4, signed=True, byteorder='little'))
def _PutUINT8(self, x):
self.ser.write(x.to_bytes(1, signed=False, byteorder='little'))
def _PutUINT16(self, x):
self.ser.write(x.to_bytes(2, signed=False, byteorder='little'))
def _PutUINT32(self, x):
self.ser.write(x.to_bytes(4, signed=False, byteorder='little'))
def _GetUINT8(self):
bv = self.ser.read(size=1)
return int.from_bytes(bv, signed=False, byteorder='little')
def _GetUINT16(self):
bv = self.ser.read(size=2)
return int.from_bytes(bv, signed=False, byteorder='little')
def _GetUINT32(self):
bv = self.ser.read(size=4)
return int.from_bytes(bv, signed=False, byteorder='little')
def _GetINT32(self):
bv = self.ser.read(size=4)
return int.from_bytes(bv, signed=True, byteorder='little')
def _Get2INT32(self):
bv = self.ser.read(size=8)
i = int.from_bytes(bv[0:4], signed=True, byteorder='little')
u = int.from_bytes(bv[4:8], signed=True, byteorder='little')
return i, u
def _CommandReq(self, command):
self.ser.write(command.encode('ASCII'))
def _CommandRsp(self, retform):
n = sum(retform)
bv = self.ser.read(size=n)
# split return bytes value
pos = 0
ret = []
for size in retform:
ret.append(int.from_bytes(bv[pos:pos+size], signed=False, byteorder='little'))
pos += size
return ret
def _Request(self, command, retform):
self._CommandReq(command)
return self._CommandRsp(retform)
def GetFirmwareVersion(self):
self._CommandReq('S0')
n = self._GetUINT8()
d = [self._GetUINT8() for _ in range(n)]
return ''.join(chr(i) for i in d)
def GetBoardID(self):
self._CommandReq('SI')
return self._GetUINT32()
def SetBoardID(self, id):
self._CommandReq('SD')
self._PutUINT32(id)
def SelectChannel(self, n):
if n-1 in range(8):
self._CommandReq('12345678'[n-1])
def GetI(self):
self._CommandReq('I')
return self._GetINT32()
def GetV(self):
self._CommandReq('V')
return self._GetINT32()
def GetIV(self):
self._CommandReq('D')
return self._GetINT32(), self._GetINT32()
def ScanI(self, fs, n):
self._CommandReq('MI')
self._PutUINT8(fs)
self._PutUINT16(n)
N = self._GetUINT16()
i = np.zeros(N, dtype=int)
for k in range(N):
i[k] = self._GetINT32()
return i
def ScanV(self, fs, n):
self._CommandReq('MV')
self._PutUINT8(fs)
self._PutUINT16(n)
N = self._GetUINT16()
u = np.zeros(N, dtype=int)
for k in range(N):
u[k] = self._GetINT32()
return u
def ScanIV(self, fs, n):
self._CommandReq('MD')
self._PutUINT8(fs)
self._PutUINT16(n)
N = self._GetUINT16()
i = np.zeros(N, dtype=int)
u = np.zeros(N, dtype=int)
for k in range(N):
i[k], u[k] = self._Get2INT32()
return i, u
def GetError(self):
status = self._Request('SE', (2,))[0]
if status == 0: return 'Status Ok'
return f'Error {status:04X}'
def GetADCversion(self):
self._CommandReq('SV')
b = self.ser.read(size=8)
s = ('ADE9113', 'ADE9112', 'ADE9103')[b[1]] if b[1] <= 3 else '?'
if len(b) >= 8:
id = f'{b[2]:02X}{b[3]:02X}{b[4]:02X}{b[5]:02X}{b[6]:02X}{b[7]:02X}'
else:
id = '????'
adc = {'ver':s, 'rev':f'{b[0]:02X}', 'id':id}
return adc
def SetRawData(self, unit=False):
self._CommandReq('SU')
self._PutUINT8(1 if unit else 0)
def SetMeanCount(self, n):
self._CommandReq('SN')
self._PutUINT8(n)
def AdjustOffsetI(self):
self._CommandReq('SCI')
return self._GetINT32()
def AdjustOffsetV(self):
self._CommandReq('SCV')
return self._GetINT32()
def AdjustScaleI(self, value):
self._CommandReq('SAI')
self._PutINT32(value)
def AdjustScaleV(self, value):
self._CommandReq('SAV')
self._PutINT32(value)
def ADC_Transmit(self, txdata):
self._CommandReq('X1')
self._PutUINT32(txdata)
def ADC_Receive(self):
self._CommandReq('X2')
return self._GetUINT32()
def ADC_Transfer(self, txdata):
self._CommandReq('XT')
self._PutUINT32(txdata)
return self._GetUINT32()
def ADC_RegisterWrite(self, reg, value):
self._CommandReq('XW')
self._PutUINT8(reg)
self._PutUINT8(value)
def ADC_RegisterRead(self, reg):
self._CommandReq('XR')
self._PutUINT8(reg)
return self._CommandRsp((1,))[0]
def ADC_RegisterRead2(self, reg):
self._CommandReq('Xr')
self._PutUINT8(reg)
return self._CommandRsp((1,1))
def ReadADC_I(self):
self._CommandReq('XI')
return self._GetINT32()
def ReadADC_V(self):
self._CommandReq('XV')
return self._GetINT32()
def ReadADC_IV(self):
self._CommandReq('XD')
return self._Get2INT32()
def ADC_Init(self):
self._CommandReq('XC')
def ADC_Sleep(self, sleep=True):
self._CommandReq('XS')
self._PutINT8(1 if sleep else 0)
def AddCRC(self, d):
self._CommandReq('XX')
self._PutUINT32(d)
return self._CommandRsp((4,))[0]
def Echo(self, din):
self._CommandReq('XE')
self._PutUINT8(din)
return self._CommandRsp((1,))[0]
def Bias_Enable(self, on):
self._CommandReq('BC')
self._PutINT8(1 if on else 0)
def Bias_GetAll(self):
self._CommandReq('BS')
return self._CommandRsp((1,))[0]
def Bias_SetV(self, v=0.0):
self._CommandReq('BV')
self._PutINT16(int(v*1000))
def Bias_GetI(self):
self._CommandReq('BI')
return self._GetINT32()/1000000000.0
def HV_Enable(self, on):
self._CommandReq('LC')
self._PutINT8(1 if on else 0)
def HV_SetAll(self, on):
self._CommandReq('LA')
self._PutINT8(1 if on else 0)
def HV_GetAll(self):
self._CommandReq('LS')
return self._CommandRsp((1,))[0]
def HV_IsLocked(self):
self._CommandReq('SL')
return self._CommandRsp((1,))[0] != 0