# -*- coding: utf-8 -*- import serial from time import sleep CANUSB_SPEED = {1000000:0x01,800000:0x02,500000:0x03,400000:0x04,250000:0x05,200000:0x06,125000:0x07,100000:0x08,50000:0x09,20000:0x0a,10000:0x0b,5000:0x0c} CANUSB_MODE = {"NORMAL":0x00,"LOOPBACK":0x01,"SILENT":0x02,"LOOPBACK_SILENT":0x03} CANUSB_FRAME = {"STANDARD":0x01,"EXTENDED":0x02} class CANUSB: def __init__(self, dev): self.__ser = serial.Serial() self.__ser_open(dev) self.__frame = CANUSB_FRAME["STANDARD"] self.__settings(CANUSB_SPEED[125000], CANUSB_MODE["NORMAL"], CANUSB_FRAME["STANDARD"]) def __del__(self): self.__ser_close() def __ser_open(self, dev, baudrate=2000000): self.__ser.port = dev self.__ser.baudrate = baudrate self.__ser.timeout = 0 if not self.__ser_isopen(): self.__ser.open() def __ser_isopen(self): return self.__ser.is_open def __ser_close(self): if self.__ser_isopen(): self.__ser.close() def __ser_write(self, data): self.__ser.write(data) def __ser_read(self): return self.__ser.read() def __settings(self, speed, mode, frame): cmd_frame = [] cmd_frame.append(0xaa) cmd_frame.append(0x55) cmd_frame.append(0x12) cmd_frame.append(speed) cmd_frame.append(frame) cmd_frame.append(0) # /* Filter ID not handled. */ cmd_frame.append(0) # /* Filter ID not handled. */ cmd_frame.append(0) # /* Filter ID not handled. */ cmd_frame.append(0) # /* Filter ID not handled. */ cmd_frame.append(0) # /* Mask ID not handled. */ cmd_frame.append(0) # /* Mask ID not handled. */ cmd_frame.append(0) # /* Mask ID not handled. */ cmd_frame.append(0) # /* Mask ID not handled. */ cmd_frame.append(mode) cmd_frame.append(0x01) cmd_frame.append(0) cmd_frame.append(0) cmd_frame.append(0) cmd_frame.append(0) cmd_frame.append(self.__generate_checksum(cmd_frame[2:19])) self.__ser_write(cmd_frame) #maybe sleep for 1 sec, because if you are too fast, it does nothing def __generate_checksum(self, data): checksum = 0 for x in range(len(data)): checksum += data[x] return checksum & 0xff def __frame_is_complete(self, frame): frame_len = len(frame) if (frame_len > 0): if (frame[0] != 0xaa): # Need to sync on 0xaa at start of frames, so just skip. return 0 if (frame_len < 2): return 0 if (frame[1] == 0x55): # /* Command frame... */ if (frame_len >= 20):# /* ...always 20 bytes. */ return 1 else: return 0 elif ((frame[1] >> 4) == 0xc): # /* Data frame... */ if (frame_len >= (frame[1] & 0xf) + 5): # /* ...payload and 5 bytes. */ return 1 else: return 0 # /* Unhandled frame type. */ return 1 def send(self, id, data_in): data_frame = [] data = bytearray(data_in) data_len = len(data) if(data_len < 0 or data_len > 8): print("Data length must between 0 and 8") print(f"Data is {data_len} Bytes long") return -1 # /* Byte 0: Packet Start */ data_frame.append(0xaa) # /* Byte 1: CAN Bus Data Frame Information */ data_frame.append(0x00) data_frame[-1] |= 0xC0 # /* Bit 7 Always 1, Bit 6 Always 1 */ if (self.__frame == 0x01): data_frame[-1] &= 0xDF # /* STD frame */ else: # /* CANUSB_FRAME_EXTENDED */ data_frame[-1] |= 0x20 # /* EXT frame */ data_frame[-1] &= 0xEF # /* 0=Data */ data_frame[-1] |= data_len # /* DLC=data_len */ # /* Byte 2 to 3: ID */ data_frame.append(id & 0xFF) # /* lsb */ data_frame.append((id >> 8) & 0xFF) # /* msb */ # /* Byte 4 to (4+data_len): Data */ for i in range(data_len): data_frame.append(data[i]) # /* Last byte: End of frame */ data_frame.append(0x55) self.__ser_write(data_frame) return 0 def recv(self, timeout=1): recv = True frame = [] counter = 0 while(recv): result = self.__ser_read() if result != b'': #print(result) num = int.from_bytes(result, "big") frame.append(num) if self.__frame_is_complete(frame): recv = 0 counter += 1 if counter > (100*timeout): print("Timeout") return -1 sleep(0.001) frame_len = len(frame) if ((frame_len == 20) and (frame[0] == 0xaa) and (frame[1] == 0x55)): checksum = self.__generate_checksum(frame[2:19]) if (checksum != frame[frame_len - 1]): print("CHECKSUM ERROR") return -1 if((frame_len >= 6) and (frame[0] == 0xaa) and ((frame[1] >> 4) == 0xc)): id = frame[3]<<8 | frame[2] data = [] for x in range(4,frame_len-1): data.append(frame[x]) return id, data return -1