init upload

This commit is contained in:
pique_n 2022-12-22 09:00:02 +01:00
commit dd5f511520
5 changed files with 855 additions and 0 deletions

8
ReadMe.md Normal file
View File

@ -0,0 +1,8 @@
# RPi Control Tec via Python and CAN
This Script helps you to Control the TEC via the CAN Bus directly.
It is outdated and not maintained anymore, because we have now [Tessie](https://github.com/ursl/tessie).
There are 2 options, via the USB to CAN adapter (from seeed studio) or via the can hat for the Raspberry PI
Because we don't need it anymore, i will not further document this, but i published it because may someone want to use it to control the TEC directly

171
canusb.py Normal file
View File

@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
import serial
from time import sleep
CANUSB_SPEED = {1000000:0x01,800000:0x02,500000:0x03,400000:0x04,250000:0x05,200000:0x06,125000:0x07,100000:0x08,50000:0x09,20000:0x0a,10000:0x0b,5000:0x0c}
CANUSB_MODE = {"NORMAL":0x00,"LOOPBACK":0x01,"SILENT":0x02,"LOOPBACK_SILENT":0x03}
CANUSB_FRAME = {"STANDARD":0x01,"EXTENDED":0x02}
class CANUSB:
def __init__(self, dev):
self.__ser = serial.Serial()
self.__ser_open(dev)
self.__frame = CANUSB_FRAME["STANDARD"]
self.__settings(CANUSB_SPEED[125000], CANUSB_MODE["NORMAL"], CANUSB_FRAME["STANDARD"])
def __del__(self):
self.__ser_close()
def __ser_open(self, dev, baudrate=2000000):
self.__ser.port = dev
self.__ser.baudrate = baudrate
self.__ser.timeout = 0
if not self.__ser_isopen():
self.__ser.open()
def __ser_isopen(self):
return self.__ser.is_open
def __ser_close(self):
if self.__ser_isopen():
self.__ser.close()
def __ser_write(self, data):
self.__ser.write(data)
def __ser_read(self):
return self.__ser.read()
def __settings(self, speed, mode, frame):
cmd_frame = []
cmd_frame.append(0xaa)
cmd_frame.append(0x55)
cmd_frame.append(0x12)
cmd_frame.append(speed)
cmd_frame.append(frame)
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(mode)
cmd_frame.append(0x01)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(self.__generate_checksum(cmd_frame[2:19]))
self.__ser_write(cmd_frame)
#maybe sleep for 1 sec, because if you are too fast, it does nothing
def __generate_checksum(self, data):
checksum = 0
for x in range(len(data)):
checksum += data[x]
return checksum & 0xff
def __frame_is_complete(self, frame):
frame_len = len(frame)
if (frame_len > 0):
if (frame[0] != 0xaa):
# Need to sync on 0xaa at start of frames, so just skip.
return 0
if (frame_len < 2):
return 0
if (frame[1] == 0x55): # /* Command frame... */
if (frame_len >= 20):# /* ...always 20 bytes. */
return 1
else:
return 0
elif ((frame[1] >> 4) == 0xc): # /* Data frame... */
if (frame_len >= (frame[1] & 0xf) + 5): # /* ...payload and 5 bytes. */
return 1
else:
return 0
# /* Unhandled frame type. */
return 1
def send(self, id, data_in):
data_frame = []
data = bytearray(data_in)
data_len = len(data)
if(data_len < 0 or data_len > 8):
print("Data length must between 0 and 8")
print(f"Data is {data_len} Bytes long")
return -1
# /* Byte 0: Packet Start */
data_frame.append(0xaa)
# /* Byte 1: CAN Bus Data Frame Information */
data_frame.append(0x00)
data_frame[-1] |= 0xC0 # /* Bit 7 Always 1, Bit 6 Always 1 */
if (self.__frame == 0x01):
data_frame[-1] &= 0xDF # /* STD frame */
else: # /* CANUSB_FRAME_EXTENDED */
data_frame[-1] |= 0x20 # /* EXT frame */
data_frame[-1] &= 0xEF # /* 0=Data */
data_frame[-1] |= data_len # /* DLC=data_len */
# /* Byte 2 to 3: ID */
data_frame.append(id & 0xFF) # /* lsb */
data_frame.append((id >> 8) & 0xFF) # /* msb */
# /* Byte 4 to (4+data_len): Data */
for i in range(data_len):
data_frame.append(data[i])
# /* Last byte: End of frame */
data_frame.append(0x55)
self.__ser_write(data_frame)
return 0
def recv(self, timeout=1):
recv = True
frame = []
counter = 0
while(recv):
result = self.__ser_read()
if result != b'':
#print(result)
num = int.from_bytes(result, "big")
frame.append(num)
if self.__frame_is_complete(frame):
recv = 0
counter += 1
if counter > (100*timeout):
print("Timeout")
return -1
sleep(0.001)
frame_len = len(frame)
if ((frame_len == 20) and (frame[0] == 0xaa) and (frame[1] == 0x55)):
checksum = self.__generate_checksum(frame[2:19])
if (checksum != frame[frame_len - 1]):
print("CHECKSUM ERROR")
return -1
if((frame_len >= 6) and (frame[0] == 0xaa) and ((frame[1] >> 4) == 0xc)):
id = frame[3]<<8 | frame[2]
data = []
for x in range(4,frame_len-1):
data.append(frame[x])
return id, data
return -1

351
tec.py Normal file
View File

@ -0,0 +1,351 @@
# -*- coding: utf-8 -*-
# tectest.py
from time import sleep, time, localtime
from teccan import *
#from tecsim import *
#from tecusb import *
class TecTest:
def __init__(self):
self.__runTimeStart = 0
self.__running = False
self.__thread = None
def __del__(self):
self.stop()
def start(self, proc = None):
self.__runTimeStart = time()
if proc:
self.__running = True
self.__thread = Thread(target=proc).start()
def stop(self):
self.__running = False
if self.__thread: self.__thread.join(2)
def isRunning(self):
return self.__running
def elapsedTime(self):
return time() - self.__runTimeStart
class TecLog:
def __init__(self, filename = None):
self.__n = 0
self.__file = None
if filename:
self.open(filename)
def __del__(self):
self.close()
def open(self, filename):
if self.__file: # close an already open file
self.close()
self.__file = open(filename, 'w')
@staticmethod
def timestamp():
t = localtime(time())
# Format: 2021.10.04 07:50
return '{0:4d}.{1:02d}.{2:02d} {3:02d}:{4:02d}:{5:02d}'\
.format(t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec)
def close(self):
if self.__file: # if file open
self.__file.close()
self.__file = None
def write(self, msg):
if not self.__file:
return
self.__file.write(msg)
self.__n += 1
if self.__n > 10: # flush file buffer
self.__file.flush()
self.__n = 0
# === Test procedures ===============================================
CAN_Enable()
#tec5 = TEC(5)
#tec6 = TEC(6)
#tec7 = TEC(7)
#tec8 = TEC(8)
test = TecTest()
log = TecLog()
# CAN_Disable()
tecnum = [1,2,3,4,5,6,7,8]
tecs = []
for num in tecnum:
tecs.append(TEC(num))
def tecall(func, var1 = None, var2 = None, var3 = None):
ret = []
for tec in tecs:
ret.append(func(tec, var1, var2, var3))
sleep(0.01)
return ret
def talkingSleep(sec):
n = sec // 10; # floor integer division
r = sec - n*10
for i in range(n):
if not test.isRunning(): return False
sleep(10)
print('elapsed time: {:0.1f}'.format(test.elapsedTime()))
if r > 0.01: sleep(r)
return True
# --- PID -----------------------------------------------------------
def _pidLoop():
global tec, test, log, T_set
#kp = 1.2
#ki = 0.028
umax = 10.0
umin = -2.0
kp = 1.2
ki = 0.03
kd = 0.0
dT = 0.5
y_int = 0.0
for tec in tecs:
tec.setUout(0)
sleep(0.01)
tec.pon()
sleep(0.01)
while test.isRunning():
for tec in tecs:
Tc, Th = tec.getTemp()
sleep(0.01)
# -error
x = Tc[0] - Tset
# proportional term
yp = kp*x
# integral term
y_int += x*dT
yi = ki*y_int
# derivative term
yd = kd*(x-tec.x_last)/dT
tec.x_last = x
# total
y = yp + yi + yd
# limitter
if y > umax:
y = umax
elif y < umin:
y = umin
tec.setUout(y)
sleep(0.01)
sleep(dT)
for tec in tecs:
tec.poff()
sleep(0.01)
def pidTest(kp=0.0, ki=0.0, kd=0.0):
global tec, test, log, Tset
log.open('pid_0001.txt')
#test.start(_pidLoop)
Tset = -25.0
#kp = 0.17
#ki = 0.015
#kd = 0.25
tecall(TEC.setTemp, Tset)
tecall(TEC.setPID, kp, ki, kd)
tecall(TEC.pon)
tecall(TEC.mode, 0)
print("start")
for t in range(120):
if t == 60:
Tset = -10.0
#tecall(TEC.setTemp, Tset)
#t = test.elapsedTime()
for tec in tecs:
Ui, Ii, Uo, Io = tec.getUI()
sleep(0.01)
Tc, Th = tec.getTemp()
log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}, '.format(t/2, Ui[0], Ii[0], Uo[0], Io[0], Tc[0], Th[0]))
sleep(0.01)
log.write('\n')
sleep(0.5)
#test.stop()
tecall(TEC.mode, 1)
tecall(TEC.poff)
print("finished")
log.close()
# --- Step response test --------------------------------------------
def _observeTec():
global tec, test, log
log.open('tecstep_102.txt')
while test.isRunning():
t = test.elapsedTime()
for tec in tecs:
Ui, Ii, Uo, Io = tec.getUI()
Tc, Th = tec.getTemp()
log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}, '.format(t, Ui[0], Ii[0], Uo[0], Io[0], Tc[0], Th[0]))
sleep(0.01)
log.write('\n')
sleep(0.5)
#if Th[0] > 35: # overheated
# for tec in tecs:
# tec.poff()
# test.stop()
# print('Peltier element overheated! Test aborted.')
log.close()
def runStepresponse(u_out, sec):
global test, tec
test.start(_observeTec)
print('test running: ...')
print('Uout = {:0.3}V'.format(float(u_out)))
for tec in tecs:
tec.setUout(u_out)
sleep(0.01)
tec.pon()
sleep(0.01)
if not talkingSleep(sec): return
print('TEC power off')
for tec in tecs:
tec.poff()
sleep(0.01)
if not talkingSleep(sec): return
test.stop()
print('test ended after {:0.1f}s runtime'.format(test.elapsedTime()))
sleep(1)
# --- Peltier stress test -------------------------------------------
def _tecCycling():
global test, log, tec
n = 0
cooling = False
while test.isRunning():
Ui, Ii, Uo, Io = tec.getUI()
Tc, Th = tec.getTemp()
# log and change temperature every 6th event (60 s interval)
n += 1
if n >= 60:
n = 0
log.write('{}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}\n'.\
format(TecLog.timestamp(), Ui, Ii, Uo, Io, Tc, Th))
print('Io:{:5.2f}A; Tc:{:5.1f}°C; Th:{:5.1f}°C'.format(Io, Tc, Th))
if cooling:
tec.poff()
cooling = False
else:
tec.pon()
cooling = True
# check for overheating every 1 s
if Th > 30:
tec.poff()
test.stop()
print('Peltier element overheated! Test aborted at {}'.format(TecLog.Timestamp()))
sleep(1)
def startTest():
global test, log
log.open('teclog_0004.txt')
test.start(_tecCycling)
tec.setUout(10)
print('Peltier stress test started at {}'.format(TecLog.timestamp()))
def stopTest():
global test, log, tec
test.stop()
log.close()
tec.poff()
print('Peltier stress test stopped at {}.'.format(TecLog.timestamp()))
sleep(1)
def vout(v):
global tec
tec.setUout(v)
tec.pon()
sleep(4)
ui, ii, uo, io = tec.getUI()
print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io))
sleep(0.5)
ui, ii, uo, io = tec.getUI()
print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io))
sleep(0.5)
ui, ii, uo, io = tec.getUI()
print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io))
tc, th = tec.getTemp()
print('{:0.2f}°C {:0.2f}°C'.format(tc, th))
tec.poff()
# --- ADC Test histogram ------------------------------------------
def adctest(n, uout=5):
global test, log
log.open('adctest.txt')
tec.setUout(uout)
tec.pon()
sleep(120)
for i in range(n):
Ui, Ii, Uo, Io = tec.getUI()
log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}\n'.format(Ui[0], Ii[0], Uo[0], Io[0]))
#sleep(0.01)
tec.poff()
log.close()
def rawadctest(n, uout=5):
global test, log
log.open('rawadctest.txt')
tec.setUout(uout)
tec.pon()
sleep(120)
for i in range(n):
Ui, Ii, Uo, Io = tec.rawgetUI()
log.write('{}, {}, {}, {}\n'.\
format(Ui, Ii, Uo, Io))
#sleep(0.01)
tec.poff()
log.close()

181
teccan.py Normal file
View File

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# tec.py
import os
from threading import Thread, Lock
from time import sleep
import can
import struct
def CAN_Enable():
#os.system('sudo ip link set can0 type can bitrate 125000')
pass #os.system('sudo ifconfig can0 up')
def CAN_Disable():
os.system('sudo ifconfig can0 down')
class CAN: # thread safe
def __init__(self):
self.__can0 = None
self.__lock = Lock()
self.connect()
def connect(self):
self.__can0 = can.interface.Bus(channel = 'can0', bustype = 'socketcan')
@staticmethod
def toBytes(x : int):
return x.to_bytes(2, byteorder='big', signed=True)
@staticmethod
def toInt(b) -> int:
return int.from_bytes(b, byteorder='big', signed=True)
@staticmethod
def toFloat(b) -> float:
a = bytearray(b)
return struct.unpack('<f', a)
@staticmethod
def fromFloat(x : float):
return list(struct.pack('<f', x))
def __send(self, id, data=[]):
if not self.__can0: return
msg = can.Message(arbitration_id=id, data=data, is_extended_id=False)
self.__can0.send(msg)
def send(self, id, data=[]): # thread safe
self.__lock.acquire()
self.__send(id, data=data)
self.__lock.release()
def __receive(self, timeout=2.0):
if not self.__can0: return -1, []
msg = self.__can0.recv(2.0)
if not msg: return -1, []
return msg.arbitration_id, msg.data
def receive(self, timeout=2.0): # thread safe
self.__lock.acquire()
ret = self.__receive(timeout)
self.__lock.release()
return ret
def request(self, id_send, data_send, timeout=2.0): # thread safe
self.__lock.acquire()
self.__send(id_send, data_send)
notFound = True
id = None
data = None
while notFound:
id, data = self.__receive(timeout)
if id == id_send | (1<<6):
notFound = False
self.__lock.release()
return id, data
# --- Cold box functions -------------------------------------------
class TEC:
def __init__(self, id):
self.can = CAN()
self.id = id
def __write(self, register, data):
data_out = [register]
data_out.extend(data)
self.can.send(0x320 | self.id,data_out)
def __cmd(self, command):
self.can.send(0x300 | self.id,[command])
def __read(self, register):
id, data = self.can.request(0x310 | self.id,[register])
return id, data
def stop(self):
pass
def pon(self, *args):
self.__cmd(1)
def poff(self, *args):
self.__cmd(2)
def reset(self, *args):
self.__cmd(255)
def save(self, *args):
self.__cmd(7)
def load(self, *args):
self.__cmd(8)
def mode(self, mode, *args):
self.__write(0, CAN.toBytes(mode))
def setUout(self, v, *args):
self.__write(1, CAN.fromFloat(v))
def getUI(self, *args):
id, data = self.__read(15)
U_in = CAN.toFloat(data[1:5])
id, data = self.__read(16)
I_in = CAN.toFloat(data[1:5])
id, data = self.__read(11)
U_out = CAN.toFloat(data[1:5])
id, data = self.__read(12)
I_out = CAN.toFloat(data[1:5])
return U_in, I_in, U_out, I_out
def rawgetUI(self, *args):
id, data = self.can.request(19)
U_in = CAN.toInt(data[0:2])
I_in = CAN.toInt(data[2:4])
U_out = CAN.toInt(data[4:6])
I_out = CAN.toInt(data[6:8])
return U_in, I_in, U_out, I_out
def getTemp(self, *args):
id, data = self.__read(9)
T_cold = CAN.toFloat(data[1:5])
id, data = self.__read(8)
T_hot = CAN.toFloat(data[1:5])
return T_cold, T_hot
def setPID(self, kp=0, ki=0, kd=0, *args):
self.__write(2, CAN.fromFloat(kp))
sleep(0.01)
self.__write(3, CAN.fromFloat(ki))
sleep(0.01)
self.__write(4, CAN.fromFloat(kd))
sleep(0.01)
def getPID(self):
id, data = self.__read(2)
kp = CAN.toFloat(data[1:5])[0]
sleep(0.01)
id, data = self.__read(3)
ki = CAN.toFloat(data[1:5])[0]
sleep(0.01)
id, data = self.__read(4)
kd = CAN.toFloat(data[1:5])[0]
sleep(0.01)
return kp, ki, kd
def getUref(self):
id, data = self.__read(20)
u = CAN.toFloat(data[1:5])[0]
return u
def setUref(self, u=0.0, *args):
self.__write(20, CAN.fromFloat(u))
def setTemp(self, temp, *args):
self.__write(5, CAN.fromFloat(temp))

144
tecusb.py Normal file
View File

@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
# tec.py
import os
from threading import Thread, Lock
from time import sleep
from canusb import *
import struct
def CAN_Enable():
pass
def CAN_Disable():
pass
class CAN: # thread safe
def __init__(self):
self.__can0 = None
self.__lock = Lock()
self.connect()
def connect(self):
self.__can0 = CANUSB("/dev/ttyUSB0")
@staticmethod
def toBytes(x : int):
return x.to_bytes(4, byteorder='big', signed=False)
@staticmethod
def toInt(b) -> int:
return int.from_bytes(b, byteorder='big', signed=True)
@staticmethod
def toFloat(b) -> float:
a = bytearray(b)
return struct.unpack('<f', a)
@staticmethod
def fromFloat(x : float):
return list(struct.pack('<f', x))
def __send(self, id, data):
if not self.__can0: return
self.__can0.send(id, data)
def send(self, id, data): # thread safe
self.__lock.acquire()
self.__send(id, data)
self.__lock.release()
def __receive(self, timeout=2):
if not self.__can0: return -1, []
id, data = self.__can0.recv(timeout)
if id: return -1, []
return id, data
def receive(self, timeout=2): # thread safe
self.__lock.acquire()
ret = self.__receive(timeout)
self.__lock.release()
return ret
def request(self, id, data_send, timeout=2): # thread safe
self.__lock.acquire()
self.__send(id, data_send)
id, data = self.__receive(timeout)
self.__lock.release()
return id, data
# --- Cold box functions -------------------------------------------
class TEC:
def __init__(self, id):
self.can = CAN()
self.id = id
def __write(self, register, data):
data_out = [register]
data_out.extend(data)
self.can.send(0x120 | self.id,data_out)
def __cmd(self, command):
self.can.send(0x100 | self.id,[command])
def __read(self, register):
id, data = self.can.request(0x110 | self.id,[register])
return id, data
def stop(self):
pass
def pon(self, *args):
self.__cmd(1)
def poff(self, *args):
self.__cmd(2)
def mode(self, mode, *args):
self.__write(0, CAN.toBytes(mode))
def setUout(self, v, *args):
self.__write(1, CAN.fromFloat(v))
def getUI(self, *args):
id, data = self.__read(15)
U_in = CAN.toFloat(data[1:5])
id, data = self.__read(16)
I_in = CAN.toFloat(data[1:5])
id, data = self.__read(11)
U_out = CAN.toFloat(data[1:5])
id, data = self.__read(12)
I_out = CAN.toFloat(data[1:5])
return U_in, I_in, U_out, I_out
def rawgetUI(self, *args):
id, data = self.can.request(19)
U_in = CAN.toInt(data[0:2])
I_in = CAN.toInt(data[2:4])
U_out = CAN.toInt(data[4:6])
I_out = CAN.toInt(data[6:8])
return U_in, I_in, U_out, I_out
def getTemp(self, *args):
id, data = self.__read(9)
T_cold = CAN.toFloat(data[1:5])
id, data = self.__read(8)
T_hot = CAN.toFloat(data[1:5])
return T_cold, T_hot
def setPID(self, kp=0, ki=0, kd=0, *args):
self.__write(2, CAN.fromFloat(kp))
sleep(0.01)
self.__write(3, CAN.fromFloat(ki))
sleep(0.01)
self.__write(4, CAN.fromFloat(kd))
sleep(0.01)
def setTemp(self, temp, *args):
self.__write(5, CAN.fromFloat(temp))