RPi_ControlTec/teccan.py
2022-12-22 09:00:02 +01:00

181 lines
3.9 KiB
Python

# -*- coding: utf-8 -*-
# tec.py
import os
from threading import Thread, Lock
from time import sleep
import can
import struct
def CAN_Enable():
#os.system('sudo ip link set can0 type can bitrate 125000')
pass #os.system('sudo ifconfig can0 up')
def CAN_Disable():
os.system('sudo ifconfig can0 down')
class CAN: # thread safe
def __init__(self):
self.__can0 = None
self.__lock = Lock()
self.connect()
def connect(self):
self.__can0 = can.interface.Bus(channel = 'can0', bustype = 'socketcan')
@staticmethod
def toBytes(x : int):
return x.to_bytes(2, byteorder='big', signed=True)
@staticmethod
def toInt(b) -> int:
return int.from_bytes(b, byteorder='big', signed=True)
@staticmethod
def toFloat(b) -> float:
a = bytearray(b)
return struct.unpack('<f', a)
@staticmethod
def fromFloat(x : float):
return list(struct.pack('<f', x))
def __send(self, id, data=[]):
if not self.__can0: return
msg = can.Message(arbitration_id=id, data=data, is_extended_id=False)
self.__can0.send(msg)
def send(self, id, data=[]): # thread safe
self.__lock.acquire()
self.__send(id, data=data)
self.__lock.release()
def __receive(self, timeout=2.0):
if not self.__can0: return -1, []
msg = self.__can0.recv(2.0)
if not msg: return -1, []
return msg.arbitration_id, msg.data
def receive(self, timeout=2.0): # thread safe
self.__lock.acquire()
ret = self.__receive(timeout)
self.__lock.release()
return ret
def request(self, id_send, data_send, timeout=2.0): # thread safe
self.__lock.acquire()
self.__send(id_send, data_send)
notFound = True
id = None
data = None
while notFound:
id, data = self.__receive(timeout)
if id == id_send | (1<<6):
notFound = False
self.__lock.release()
return id, data
# --- Cold box functions -------------------------------------------
class TEC:
def __init__(self, id):
self.can = CAN()
self.id = id
def __write(self, register, data):
data_out = [register]
data_out.extend(data)
self.can.send(0x320 | self.id,data_out)
def __cmd(self, command):
self.can.send(0x300 | self.id,[command])
def __read(self, register):
id, data = self.can.request(0x310 | self.id,[register])
return id, data
def stop(self):
pass
def pon(self, *args):
self.__cmd(1)
def poff(self, *args):
self.__cmd(2)
def reset(self, *args):
self.__cmd(255)
def save(self, *args):
self.__cmd(7)
def load(self, *args):
self.__cmd(8)
def mode(self, mode, *args):
self.__write(0, CAN.toBytes(mode))
def setUout(self, v, *args):
self.__write(1, CAN.fromFloat(v))
def getUI(self, *args):
id, data = self.__read(15)
U_in = CAN.toFloat(data[1:5])
id, data = self.__read(16)
I_in = CAN.toFloat(data[1:5])
id, data = self.__read(11)
U_out = CAN.toFloat(data[1:5])
id, data = self.__read(12)
I_out = CAN.toFloat(data[1:5])
return U_in, I_in, U_out, I_out
def rawgetUI(self, *args):
id, data = self.can.request(19)
U_in = CAN.toInt(data[0:2])
I_in = CAN.toInt(data[2:4])
U_out = CAN.toInt(data[4:6])
I_out = CAN.toInt(data[6:8])
return U_in, I_in, U_out, I_out
def getTemp(self, *args):
id, data = self.__read(9)
T_cold = CAN.toFloat(data[1:5])
id, data = self.__read(8)
T_hot = CAN.toFloat(data[1:5])
return T_cold, T_hot
def setPID(self, kp=0, ki=0, kd=0, *args):
self.__write(2, CAN.fromFloat(kp))
sleep(0.01)
self.__write(3, CAN.fromFloat(ki))
sleep(0.01)
self.__write(4, CAN.fromFloat(kd))
sleep(0.01)
def getPID(self):
id, data = self.__read(2)
kp = CAN.toFloat(data[1:5])[0]
sleep(0.01)
id, data = self.__read(3)
ki = CAN.toFloat(data[1:5])[0]
sleep(0.01)
id, data = self.__read(4)
kd = CAN.toFloat(data[1:5])[0]
sleep(0.01)
return kp, ki, kd
def getUref(self):
id, data = self.__read(20)
u = CAN.toFloat(data[1:5])[0]
return u
def setUref(self, u=0.0, *args):
self.__write(20, CAN.fromFloat(u))
def setTemp(self, temp, *args):
self.__write(5, CAN.fromFloat(temp))