144 lines
3.0 KiB
Python
144 lines
3.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
# tec.py
|
|
|
|
import os
|
|
from threading import Thread, Lock
|
|
from time import sleep
|
|
from canusb import *
|
|
import struct
|
|
|
|
|
|
def CAN_Enable():
|
|
pass
|
|
|
|
def CAN_Disable():
|
|
pass
|
|
|
|
|
|
class CAN: # thread safe
|
|
|
|
def __init__(self):
|
|
self.__can0 = None
|
|
self.__lock = Lock()
|
|
self.connect()
|
|
|
|
def connect(self):
|
|
self.__can0 = CANUSB("/dev/ttyUSB0")
|
|
|
|
@staticmethod
|
|
def toBytes(x : int):
|
|
return x.to_bytes(4, byteorder='big', signed=False)
|
|
|
|
@staticmethod
|
|
def toInt(b) -> int:
|
|
return int.from_bytes(b, byteorder='big', signed=True)
|
|
|
|
@staticmethod
|
|
def toFloat(b) -> float:
|
|
a = bytearray(b)
|
|
return struct.unpack('<f', a)
|
|
|
|
@staticmethod
|
|
def fromFloat(x : float):
|
|
return list(struct.pack('<f', x))
|
|
|
|
def __send(self, id, data):
|
|
if not self.__can0: return
|
|
self.__can0.send(id, data)
|
|
|
|
def send(self, id, data): # thread safe
|
|
self.__lock.acquire()
|
|
self.__send(id, data)
|
|
self.__lock.release()
|
|
|
|
def __receive(self, timeout=2):
|
|
if not self.__can0: return -1, []
|
|
id, data = self.__can0.recv(timeout)
|
|
if id: return -1, []
|
|
return id, data
|
|
|
|
def receive(self, timeout=2): # thread safe
|
|
self.__lock.acquire()
|
|
ret = self.__receive(timeout)
|
|
self.__lock.release()
|
|
return ret
|
|
|
|
def request(self, id, data_send, timeout=2): # thread safe
|
|
self.__lock.acquire()
|
|
self.__send(id, data_send)
|
|
id, data = self.__receive(timeout)
|
|
self.__lock.release()
|
|
return id, data
|
|
|
|
|
|
|
|
# --- Cold box functions -------------------------------------------
|
|
|
|
class TEC:
|
|
def __init__(self, id):
|
|
self.can = CAN()
|
|
self.id = id
|
|
|
|
def __write(self, register, data):
|
|
data_out = [register]
|
|
data_out.extend(data)
|
|
self.can.send(0x120 | self.id,data_out)
|
|
|
|
def __cmd(self, command):
|
|
self.can.send(0x100 | self.id,[command])
|
|
|
|
def __read(self, register):
|
|
id, data = self.can.request(0x110 | self.id,[register])
|
|
return id, data
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
def pon(self, *args):
|
|
self.__cmd(1)
|
|
|
|
def poff(self, *args):
|
|
self.__cmd(2)
|
|
|
|
def mode(self, mode, *args):
|
|
self.__write(0, CAN.toBytes(mode))
|
|
|
|
def setUout(self, v, *args):
|
|
self.__write(1, CAN.fromFloat(v))
|
|
|
|
def getUI(self, *args):
|
|
id, data = self.__read(15)
|
|
U_in = CAN.toFloat(data[1:5])
|
|
id, data = self.__read(16)
|
|
I_in = CAN.toFloat(data[1:5])
|
|
id, data = self.__read(11)
|
|
U_out = CAN.toFloat(data[1:5])
|
|
id, data = self.__read(12)
|
|
I_out = CAN.toFloat(data[1:5])
|
|
return U_in, I_in, U_out, I_out
|
|
|
|
def rawgetUI(self, *args):
|
|
id, data = self.can.request(19)
|
|
U_in = CAN.toInt(data[0:2])
|
|
I_in = CAN.toInt(data[2:4])
|
|
U_out = CAN.toInt(data[4:6])
|
|
I_out = CAN.toInt(data[6:8])
|
|
return U_in, I_in, U_out, I_out
|
|
|
|
def getTemp(self, *args):
|
|
id, data = self.__read(9)
|
|
T_cold = CAN.toFloat(data[1:5])
|
|
id, data = self.__read(8)
|
|
T_hot = CAN.toFloat(data[1:5])
|
|
return T_cold, T_hot
|
|
|
|
def setPID(self, kp=0, ki=0, kd=0, *args):
|
|
self.__write(2, CAN.fromFloat(kp))
|
|
sleep(0.01)
|
|
self.__write(3, CAN.fromFloat(ki))
|
|
sleep(0.01)
|
|
self.__write(4, CAN.fromFloat(kd))
|
|
sleep(0.01)
|
|
|
|
def setTemp(self, temp, *args):
|
|
self.__write(5, CAN.fromFloat(temp)) |