172 lines
4.4 KiB
Python
172 lines
4.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import serial
|
|
from time import sleep
|
|
|
|
CANUSB_SPEED = {1000000:0x01,800000:0x02,500000:0x03,400000:0x04,250000:0x05,200000:0x06,125000:0x07,100000:0x08,50000:0x09,20000:0x0a,10000:0x0b,5000:0x0c}
|
|
CANUSB_MODE = {"NORMAL":0x00,"LOOPBACK":0x01,"SILENT":0x02,"LOOPBACK_SILENT":0x03}
|
|
CANUSB_FRAME = {"STANDARD":0x01,"EXTENDED":0x02}
|
|
|
|
class CANUSB:
|
|
def __init__(self, dev):
|
|
self.__ser = serial.Serial()
|
|
self.__ser_open(dev)
|
|
self.__frame = CANUSB_FRAME["STANDARD"]
|
|
self.__settings(CANUSB_SPEED[125000], CANUSB_MODE["NORMAL"], CANUSB_FRAME["STANDARD"])
|
|
|
|
def __del__(self):
|
|
self.__ser_close()
|
|
|
|
def __ser_open(self, dev, baudrate=2000000):
|
|
self.__ser.port = dev
|
|
self.__ser.baudrate = baudrate
|
|
self.__ser.timeout = 0
|
|
if not self.__ser_isopen():
|
|
self.__ser.open()
|
|
|
|
def __ser_isopen(self):
|
|
return self.__ser.is_open
|
|
|
|
def __ser_close(self):
|
|
if self.__ser_isopen():
|
|
self.__ser.close()
|
|
|
|
def __ser_write(self, data):
|
|
self.__ser.write(data)
|
|
|
|
def __ser_read(self):
|
|
return self.__ser.read()
|
|
|
|
def __settings(self, speed, mode, frame):
|
|
cmd_frame = []
|
|
cmd_frame.append(0xaa)
|
|
cmd_frame.append(0x55)
|
|
cmd_frame.append(0x12)
|
|
cmd_frame.append(speed)
|
|
cmd_frame.append(frame)
|
|
cmd_frame.append(0) # /* Filter ID not handled. */
|
|
cmd_frame.append(0) # /* Filter ID not handled. */
|
|
cmd_frame.append(0) # /* Filter ID not handled. */
|
|
cmd_frame.append(0) # /* Filter ID not handled. */
|
|
cmd_frame.append(0) # /* Mask ID not handled. */
|
|
cmd_frame.append(0) # /* Mask ID not handled. */
|
|
cmd_frame.append(0) # /* Mask ID not handled. */
|
|
cmd_frame.append(0) # /* Mask ID not handled. */
|
|
cmd_frame.append(mode)
|
|
cmd_frame.append(0x01)
|
|
cmd_frame.append(0)
|
|
cmd_frame.append(0)
|
|
cmd_frame.append(0)
|
|
cmd_frame.append(0)
|
|
cmd_frame.append(self.__generate_checksum(cmd_frame[2:19]))
|
|
self.__ser_write(cmd_frame)
|
|
#maybe sleep for 1 sec, because if you are too fast, it does nothing
|
|
|
|
def __generate_checksum(self, data):
|
|
checksum = 0
|
|
for x in range(len(data)):
|
|
checksum += data[x]
|
|
return checksum & 0xff
|
|
|
|
def __frame_is_complete(self, frame):
|
|
frame_len = len(frame)
|
|
|
|
if (frame_len > 0):
|
|
if (frame[0] != 0xaa):
|
|
# Need to sync on 0xaa at start of frames, so just skip.
|
|
return 0
|
|
|
|
if (frame_len < 2):
|
|
return 0
|
|
|
|
if (frame[1] == 0x55): # /* Command frame... */
|
|
if (frame_len >= 20):# /* ...always 20 bytes. */
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
elif ((frame[1] >> 4) == 0xc): # /* Data frame... */
|
|
if (frame_len >= (frame[1] & 0xf) + 5): # /* ...payload and 5 bytes. */
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
# /* Unhandled frame type. */
|
|
return 1
|
|
|
|
def send(self, id, data_in):
|
|
data_frame = []
|
|
|
|
data = bytearray(data_in)
|
|
data_len = len(data)
|
|
|
|
if(data_len < 0 or data_len > 8):
|
|
print("Data length must between 0 and 8")
|
|
print(f"Data is {data_len} Bytes long")
|
|
return -1
|
|
|
|
# /* Byte 0: Packet Start */
|
|
data_frame.append(0xaa)
|
|
|
|
# /* Byte 1: CAN Bus Data Frame Information */
|
|
data_frame.append(0x00)
|
|
data_frame[-1] |= 0xC0 # /* Bit 7 Always 1, Bit 6 Always 1 */
|
|
if (self.__frame == 0x01):
|
|
data_frame[-1] &= 0xDF # /* STD frame */
|
|
else: # /* CANUSB_FRAME_EXTENDED */
|
|
data_frame[-1] |= 0x20 # /* EXT frame */
|
|
data_frame[-1] &= 0xEF # /* 0=Data */
|
|
data_frame[-1] |= data_len # /* DLC=data_len */
|
|
|
|
# /* Byte 2 to 3: ID */
|
|
data_frame.append(id & 0xFF) # /* lsb */
|
|
data_frame.append((id >> 8) & 0xFF) # /* msb */
|
|
|
|
# /* Byte 4 to (4+data_len): Data */
|
|
for i in range(data_len):
|
|
data_frame.append(data[i])
|
|
|
|
# /* Last byte: End of frame */
|
|
data_frame.append(0x55)
|
|
|
|
self.__ser_write(data_frame)
|
|
|
|
return 0
|
|
|
|
def recv(self, timeout=1):
|
|
|
|
recv = True
|
|
frame = []
|
|
counter = 0
|
|
|
|
while(recv):
|
|
result = self.__ser_read()
|
|
if result != b'':
|
|
#print(result)
|
|
num = int.from_bytes(result, "big")
|
|
|
|
frame.append(num)
|
|
if self.__frame_is_complete(frame):
|
|
recv = 0
|
|
|
|
counter += 1
|
|
if counter > (100*timeout):
|
|
print("Timeout")
|
|
return -1
|
|
sleep(0.001)
|
|
|
|
frame_len = len(frame)
|
|
if ((frame_len == 20) and (frame[0] == 0xaa) and (frame[1] == 0x55)):
|
|
checksum = self.__generate_checksum(frame[2:19])
|
|
if (checksum != frame[frame_len - 1]):
|
|
print("CHECKSUM ERROR")
|
|
return -1
|
|
|
|
if((frame_len >= 6) and (frame[0] == 0xaa) and ((frame[1] >> 4) == 0xc)):
|
|
id = frame[3]<<8 | frame[2]
|
|
data = []
|
|
for x in range(4,frame_len-1):
|
|
data.append(frame[x])
|
|
return id, data
|
|
return -1
|