From dd5f511520d1c228bf34e704c479d24837a4298a Mon Sep 17 00:00:00 2001 From: pique_n Date: Thu, 22 Dec 2022 09:00:02 +0100 Subject: [PATCH] init upload --- ReadMe.md | 8 ++ canusb.py | 171 ++++++++++++++++++++++++++ tec.py | 351 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ teccan.py | 181 ++++++++++++++++++++++++++++ tecusb.py | 144 ++++++++++++++++++++++ 5 files changed, 855 insertions(+) create mode 100644 ReadMe.md create mode 100644 canusb.py create mode 100644 tec.py create mode 100644 teccan.py create mode 100644 tecusb.py diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..c2d490b --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,8 @@ +# RPi Control Tec via Python and CAN + +This Script helps you to Control the TEC via the CAN Bus directly. +It is outdated and not maintained anymore, because we have now [Tessie](https://github.com/ursl/tessie). + +There are 2 options, via the USB to CAN adapter (from seeed studio) or via the can hat for the Raspberry PI + +Because we don't need it anymore, i will not further document this, but i published it because may someone want to use it to control the TEC directly \ No newline at end of file diff --git a/canusb.py b/canusb.py new file mode 100644 index 0000000..07f30fa --- /dev/null +++ b/canusb.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- + +import serial +from time import sleep + +CANUSB_SPEED = {1000000:0x01,800000:0x02,500000:0x03,400000:0x04,250000:0x05,200000:0x06,125000:0x07,100000:0x08,50000:0x09,20000:0x0a,10000:0x0b,5000:0x0c} +CANUSB_MODE = {"NORMAL":0x00,"LOOPBACK":0x01,"SILENT":0x02,"LOOPBACK_SILENT":0x03} +CANUSB_FRAME = {"STANDARD":0x01,"EXTENDED":0x02} + +class CANUSB: + def __init__(self, dev): + self.__ser = serial.Serial() + self.__ser_open(dev) + self.__frame = CANUSB_FRAME["STANDARD"] + self.__settings(CANUSB_SPEED[125000], CANUSB_MODE["NORMAL"], CANUSB_FRAME["STANDARD"]) + + def __del__(self): + self.__ser_close() + + def __ser_open(self, dev, baudrate=2000000): + self.__ser.port = dev + self.__ser.baudrate = baudrate + self.__ser.timeout = 0 + if not self.__ser_isopen(): + self.__ser.open() + + def __ser_isopen(self): + return self.__ser.is_open + + def __ser_close(self): + if self.__ser_isopen(): + self.__ser.close() + + def __ser_write(self, data): + self.__ser.write(data) + + def __ser_read(self): + return self.__ser.read() + + def __settings(self, speed, mode, frame): + cmd_frame = [] + cmd_frame.append(0xaa) + cmd_frame.append(0x55) + cmd_frame.append(0x12) + cmd_frame.append(speed) + cmd_frame.append(frame) + cmd_frame.append(0) # /* Filter ID not handled. */ + cmd_frame.append(0) # /* Filter ID not handled. */ + cmd_frame.append(0) # /* Filter ID not handled. */ + cmd_frame.append(0) # /* Filter ID not handled. */ + cmd_frame.append(0) # /* Mask ID not handled. */ + cmd_frame.append(0) # /* Mask ID not handled. */ + cmd_frame.append(0) # /* Mask ID not handled. */ + cmd_frame.append(0) # /* Mask ID not handled. */ + cmd_frame.append(mode) + cmd_frame.append(0x01) + cmd_frame.append(0) + cmd_frame.append(0) + cmd_frame.append(0) + cmd_frame.append(0) + cmd_frame.append(self.__generate_checksum(cmd_frame[2:19])) + self.__ser_write(cmd_frame) + #maybe sleep for 1 sec, because if you are too fast, it does nothing + + def __generate_checksum(self, data): + checksum = 0 + for x in range(len(data)): + checksum += data[x] + return checksum & 0xff + + def __frame_is_complete(self, frame): + frame_len = len(frame) + + if (frame_len > 0): + if (frame[0] != 0xaa): + # Need to sync on 0xaa at start of frames, so just skip. + return 0 + + if (frame_len < 2): + return 0 + + if (frame[1] == 0x55): # /* Command frame... */ + if (frame_len >= 20):# /* ...always 20 bytes. */ + return 1 + else: + return 0 + + elif ((frame[1] >> 4) == 0xc): # /* Data frame... */ + if (frame_len >= (frame[1] & 0xf) + 5): # /* ...payload and 5 bytes. */ + return 1 + else: + return 0 + + # /* Unhandled frame type. */ + return 1 + + def send(self, id, data_in): + data_frame = [] + + data = bytearray(data_in) + data_len = len(data) + + if(data_len < 0 or data_len > 8): + print("Data length must between 0 and 8") + print(f"Data is {data_len} Bytes long") + return -1 + + # /* Byte 0: Packet Start */ + data_frame.append(0xaa) + + # /* Byte 1: CAN Bus Data Frame Information */ + data_frame.append(0x00) + data_frame[-1] |= 0xC0 # /* Bit 7 Always 1, Bit 6 Always 1 */ + if (self.__frame == 0x01): + data_frame[-1] &= 0xDF # /* STD frame */ + else: # /* CANUSB_FRAME_EXTENDED */ + data_frame[-1] |= 0x20 # /* EXT frame */ + data_frame[-1] &= 0xEF # /* 0=Data */ + data_frame[-1] |= data_len # /* DLC=data_len */ + + # /* Byte 2 to 3: ID */ + data_frame.append(id & 0xFF) # /* lsb */ + data_frame.append((id >> 8) & 0xFF) # /* msb */ + + # /* Byte 4 to (4+data_len): Data */ + for i in range(data_len): + data_frame.append(data[i]) + + # /* Last byte: End of frame */ + data_frame.append(0x55) + + self.__ser_write(data_frame) + + return 0 + + def recv(self, timeout=1): + + recv = True + frame = [] + counter = 0 + + while(recv): + result = self.__ser_read() + if result != b'': + #print(result) + num = int.from_bytes(result, "big") + + frame.append(num) + if self.__frame_is_complete(frame): + recv = 0 + + counter += 1 + if counter > (100*timeout): + print("Timeout") + return -1 + sleep(0.001) + + frame_len = len(frame) + if ((frame_len == 20) and (frame[0] == 0xaa) and (frame[1] == 0x55)): + checksum = self.__generate_checksum(frame[2:19]) + if (checksum != frame[frame_len - 1]): + print("CHECKSUM ERROR") + return -1 + + if((frame_len >= 6) and (frame[0] == 0xaa) and ((frame[1] >> 4) == 0xc)): + id = frame[3]<<8 | frame[2] + data = [] + for x in range(4,frame_len-1): + data.append(frame[x]) + return id, data + return -1 diff --git a/tec.py b/tec.py new file mode 100644 index 0000000..033ddd9 --- /dev/null +++ b/tec.py @@ -0,0 +1,351 @@ +# -*- coding: utf-8 -*- +# tectest.py + +from time import sleep, time, localtime +from teccan import * +#from tecsim import * +#from tecusb import * + + +class TecTest: + + def __init__(self): + self.__runTimeStart = 0 + self.__running = False + self.__thread = None + + def __del__(self): + self.stop() + + def start(self, proc = None): + self.__runTimeStart = time() + if proc: + self.__running = True + self.__thread = Thread(target=proc).start() + + def stop(self): + self.__running = False + if self.__thread: self.__thread.join(2) + + def isRunning(self): + return self.__running + + def elapsedTime(self): + return time() - self.__runTimeStart + + +class TecLog: + def __init__(self, filename = None): + self.__n = 0 + self.__file = None + if filename: + self.open(filename) + + def __del__(self): + self.close() + + def open(self, filename): + if self.__file: # close an already open file + self.close() + self.__file = open(filename, 'w') + + @staticmethod + def timestamp(): + t = localtime(time()) + # Format: 2021.10.04 07:50 + return '{0:4d}.{1:02d}.{2:02d} {3:02d}:{4:02d}:{5:02d}'\ + .format(t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec) + + def close(self): + if self.__file: # if file open + self.__file.close() + self.__file = None + + def write(self, msg): + if not self.__file: + return + self.__file.write(msg) + self.__n += 1 + if self.__n > 10: # flush file buffer + self.__file.flush() + self.__n = 0 + + +# === Test procedures =============================================== + +CAN_Enable() + +#tec5 = TEC(5) +#tec6 = TEC(6) +#tec7 = TEC(7) +#tec8 = TEC(8) + +test = TecTest() +log = TecLog() +# CAN_Disable() + +tecnum = [1,2,3,4,5,6,7,8] +tecs = [] +for num in tecnum: + tecs.append(TEC(num)) + + +def tecall(func, var1 = None, var2 = None, var3 = None): + ret = [] + for tec in tecs: + ret.append(func(tec, var1, var2, var3)) + sleep(0.01) + + return ret + + +def talkingSleep(sec): + n = sec // 10; # floor integer division + r = sec - n*10 + for i in range(n): + if not test.isRunning(): return False + sleep(10) + print('elapsed time: {:0.1f}'.format(test.elapsedTime())) + if r > 0.01: sleep(r) + return True + +# --- PID ----------------------------------------------------------- + + + +def _pidLoop(): + global tec, test, log, T_set + #kp = 1.2 + #ki = 0.028 + umax = 10.0 + umin = -2.0 + + kp = 1.2 + ki = 0.03 + kd = 0.0 + + dT = 0.5 + y_int = 0.0 + for tec in tecs: + tec.setUout(0) + sleep(0.01) + tec.pon() + sleep(0.01) + + while test.isRunning(): + for tec in tecs: + Tc, Th = tec.getTemp() + sleep(0.01) + # -error + x = Tc[0] - Tset + + # proportional term + yp = kp*x + + # integral term + y_int += x*dT + yi = ki*y_int + + # derivative term + yd = kd*(x-tec.x_last)/dT + tec.x_last = x + + # total + y = yp + yi + yd + + # limitter + if y > umax: + y = umax + elif y < umin: + y = umin + + tec.setUout(y) + sleep(0.01) + sleep(dT) + for tec in tecs: + tec.poff() + sleep(0.01) + + +def pidTest(kp=0.0, ki=0.0, kd=0.0): + global tec, test, log, Tset + log.open('pid_0001.txt') + #test.start(_pidLoop) + + Tset = -25.0 + #kp = 0.17 + #ki = 0.015 + #kd = 0.25 + + tecall(TEC.setTemp, Tset) + tecall(TEC.setPID, kp, ki, kd) + tecall(TEC.pon) + tecall(TEC.mode, 0) + + + print("start") + for t in range(120): + if t == 60: + Tset = -10.0 + #tecall(TEC.setTemp, Tset) + #t = test.elapsedTime() + for tec in tecs: + Ui, Ii, Uo, Io = tec.getUI() + sleep(0.01) + Tc, Th = tec.getTemp() + log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}, '.format(t/2, Ui[0], Ii[0], Uo[0], Io[0], Tc[0], Th[0])) + sleep(0.01) + log.write('\n') + sleep(0.5) + #test.stop() + tecall(TEC.mode, 1) + tecall(TEC.poff) + print("finished") + log.close() + +# --- Step response test -------------------------------------------- + +def _observeTec(): + global tec, test, log + log.open('tecstep_102.txt') + while test.isRunning(): + t = test.elapsedTime() + for tec in tecs: + Ui, Ii, Uo, Io = tec.getUI() + Tc, Th = tec.getTemp() + log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}, '.format(t, Ui[0], Ii[0], Uo[0], Io[0], Tc[0], Th[0])) + sleep(0.01) + log.write('\n') + sleep(0.5) + #if Th[0] > 35: # overheated + # for tec in tecs: + # tec.poff() + # test.stop() + # print('Peltier element overheated! Test aborted.') + log.close() + + +def runStepresponse(u_out, sec): + global test, tec + test.start(_observeTec) + print('test running: ...') + + print('Uout = {:0.3}V'.format(float(u_out))) + for tec in tecs: + tec.setUout(u_out) + sleep(0.01) + tec.pon() + sleep(0.01) + + if not talkingSleep(sec): return + + print('TEC power off') + for tec in tecs: + tec.poff() + sleep(0.01) + + if not talkingSleep(sec): return + + test.stop() + print('test ended after {:0.1f}s runtime'.format(test.elapsedTime())) + sleep(1) + + +# --- Peltier stress test ------------------------------------------- + +def _tecCycling(): + global test, log, tec + n = 0 + cooling = False + while test.isRunning(): + Ui, Ii, Uo, Io = tec.getUI() + Tc, Th = tec.getTemp() + + # log and change temperature every 6th event (60 s interval) + n += 1 + if n >= 60: + n = 0 + log.write('{}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}, {:0.2f}, {:0.2f}\n'.\ + format(TecLog.timestamp(), Ui, Ii, Uo, Io, Tc, Th)) + print('Io:{:5.2f}A; Tc:{:5.1f}°C; Th:{:5.1f}°C'.format(Io, Tc, Th)) + if cooling: + tec.poff() + cooling = False + else: + tec.pon() + cooling = True + + # check for overheating every 1 s + if Th > 30: + tec.poff() + test.stop() + print('Peltier element overheated! Test aborted at {}'.format(TecLog.Timestamp())) + + sleep(1) + + +def startTest(): + global test, log + log.open('teclog_0004.txt') + test.start(_tecCycling) + tec.setUout(10) + print('Peltier stress test started at {}'.format(TecLog.timestamp())) + + +def stopTest(): + global test, log, tec + test.stop() + log.close() + tec.poff() + print('Peltier stress test stopped at {}.'.format(TecLog.timestamp())) + sleep(1) + +def vout(v): + global tec + tec.setUout(v) + tec.pon() + sleep(4) + ui, ii, uo, io = tec.getUI() + print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io)) + sleep(0.5) + ui, ii, uo, io = tec.getUI() + print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io)) + sleep(0.5) + ui, ii, uo, io = tec.getUI() + print('{:0.3f}V {:0.3f}A {:0.3f}V {:0.3f}A'.format(ui, ii, uo, io)) + tc, th = tec.getTemp() + print('{:0.2f}°C {:0.2f}°C'.format(tc, th)) + tec.poff() + +# --- ADC Test histogram ------------------------------------------ + +def adctest(n, uout=5): + global test, log + log.open('adctest.txt') + tec.setUout(uout) + tec.pon() + sleep(120) + + for i in range(n): + Ui, Ii, Uo, Io = tec.getUI() + log.write('{:0.3f}, {:0.3f}, {:0.3f}, {:0.3f}\n'.format(Ui[0], Ii[0], Uo[0], Io[0])) + #sleep(0.01) + + tec.poff() + log.close() + +def rawadctest(n, uout=5): + global test, log + log.open('rawadctest.txt') + tec.setUout(uout) + tec.pon() + sleep(120) + + for i in range(n): + Ui, Ii, Uo, Io = tec.rawgetUI() + log.write('{}, {}, {}, {}\n'.\ + format(Ui, Ii, Uo, Io)) + #sleep(0.01) + + tec.poff() + log.close() diff --git a/teccan.py b/teccan.py new file mode 100644 index 0000000..873e258 --- /dev/null +++ b/teccan.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +# tec.py + +import os +from threading import Thread, Lock +from time import sleep +import can +import struct + + +def CAN_Enable(): + #os.system('sudo ip link set can0 type can bitrate 125000') + pass #os.system('sudo ifconfig can0 up') + + +def CAN_Disable(): + os.system('sudo ifconfig can0 down') + + +class CAN: # thread safe + def __init__(self): + self.__can0 = None + self.__lock = Lock() + self.connect() + + def connect(self): + self.__can0 = can.interface.Bus(channel = 'can0', bustype = 'socketcan') + + @staticmethod + def toBytes(x : int): + return x.to_bytes(2, byteorder='big', signed=True) + + @staticmethod + def toInt(b) -> int: + return int.from_bytes(b, byteorder='big', signed=True) + + @staticmethod + def toFloat(b) -> float: + a = bytearray(b) + return struct.unpack(' int: + return int.from_bytes(b, byteorder='big', signed=True) + + @staticmethod + def toFloat(b) -> float: + a = bytearray(b) + return struct.unpack('