RPi_ControlTec/canusb.py
2022-12-22 09:00:02 +01:00

172 lines
4.4 KiB
Python

# -*- coding: utf-8 -*-
import serial
from time import sleep
CANUSB_SPEED = {1000000:0x01,800000:0x02,500000:0x03,400000:0x04,250000:0x05,200000:0x06,125000:0x07,100000:0x08,50000:0x09,20000:0x0a,10000:0x0b,5000:0x0c}
CANUSB_MODE = {"NORMAL":0x00,"LOOPBACK":0x01,"SILENT":0x02,"LOOPBACK_SILENT":0x03}
CANUSB_FRAME = {"STANDARD":0x01,"EXTENDED":0x02}
class CANUSB:
def __init__(self, dev):
self.__ser = serial.Serial()
self.__ser_open(dev)
self.__frame = CANUSB_FRAME["STANDARD"]
self.__settings(CANUSB_SPEED[125000], CANUSB_MODE["NORMAL"], CANUSB_FRAME["STANDARD"])
def __del__(self):
self.__ser_close()
def __ser_open(self, dev, baudrate=2000000):
self.__ser.port = dev
self.__ser.baudrate = baudrate
self.__ser.timeout = 0
if not self.__ser_isopen():
self.__ser.open()
def __ser_isopen(self):
return self.__ser.is_open
def __ser_close(self):
if self.__ser_isopen():
self.__ser.close()
def __ser_write(self, data):
self.__ser.write(data)
def __ser_read(self):
return self.__ser.read()
def __settings(self, speed, mode, frame):
cmd_frame = []
cmd_frame.append(0xaa)
cmd_frame.append(0x55)
cmd_frame.append(0x12)
cmd_frame.append(speed)
cmd_frame.append(frame)
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Filter ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(0) # /* Mask ID not handled. */
cmd_frame.append(mode)
cmd_frame.append(0x01)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(0)
cmd_frame.append(self.__generate_checksum(cmd_frame[2:19]))
self.__ser_write(cmd_frame)
#maybe sleep for 1 sec, because if you are too fast, it does nothing
def __generate_checksum(self, data):
checksum = 0
for x in range(len(data)):
checksum += data[x]
return checksum & 0xff
def __frame_is_complete(self, frame):
frame_len = len(frame)
if (frame_len > 0):
if (frame[0] != 0xaa):
# Need to sync on 0xaa at start of frames, so just skip.
return 0
if (frame_len < 2):
return 0
if (frame[1] == 0x55): # /* Command frame... */
if (frame_len >= 20):# /* ...always 20 bytes. */
return 1
else:
return 0
elif ((frame[1] >> 4) == 0xc): # /* Data frame... */
if (frame_len >= (frame[1] & 0xf) + 5): # /* ...payload and 5 bytes. */
return 1
else:
return 0
# /* Unhandled frame type. */
return 1
def send(self, id, data_in):
data_frame = []
data = bytearray(data_in)
data_len = len(data)
if(data_len < 0 or data_len > 8):
print("Data length must between 0 and 8")
print(f"Data is {data_len} Bytes long")
return -1
# /* Byte 0: Packet Start */
data_frame.append(0xaa)
# /* Byte 1: CAN Bus Data Frame Information */
data_frame.append(0x00)
data_frame[-1] |= 0xC0 # /* Bit 7 Always 1, Bit 6 Always 1 */
if (self.__frame == 0x01):
data_frame[-1] &= 0xDF # /* STD frame */
else: # /* CANUSB_FRAME_EXTENDED */
data_frame[-1] |= 0x20 # /* EXT frame */
data_frame[-1] &= 0xEF # /* 0=Data */
data_frame[-1] |= data_len # /* DLC=data_len */
# /* Byte 2 to 3: ID */
data_frame.append(id & 0xFF) # /* lsb */
data_frame.append((id >> 8) & 0xFF) # /* msb */
# /* Byte 4 to (4+data_len): Data */
for i in range(data_len):
data_frame.append(data[i])
# /* Last byte: End of frame */
data_frame.append(0x55)
self.__ser_write(data_frame)
return 0
def recv(self, timeout=1):
recv = True
frame = []
counter = 0
while(recv):
result = self.__ser_read()
if result != b'':
#print(result)
num = int.from_bytes(result, "big")
frame.append(num)
if self.__frame_is_complete(frame):
recv = 0
counter += 1
if counter > (100*timeout):
print("Timeout")
return -1
sleep(0.001)
frame_len = len(frame)
if ((frame_len == 20) and (frame[0] == 0xaa) and (frame[1] == 0x55)):
checksum = self.__generate_checksum(frame[2:19])
if (checksum != frame[frame_len - 1]):
print("CHECKSUM ERROR")
return -1
if((frame_len >= 6) and (frame[0] == 0xaa) and ((frame[1] >> 4) == 0xc)):
id = frame[3]<<8 | frame[2]
data = []
for x in range(4,frame_len-1):
data.append(frame[x])
return id, data
return -1