diff --git a/README.md b/README.md index 606490c..613e577 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,10 @@ This is a driver for the pmacV3 motion controller with the SINQ communication pr This driver is a standard sinqMotor-derived driver and does not need any specific configuration. For the general configuration, please see https://git.psi.ch/sinq-epics-modules/sinqmotor/-/blob/main/README.md. -The folder "utils" contains utility scripts for working with pmac motor controllers: +The folder "utils" contains utility scripts for working with pmac motor controllers. To read their manual, run the scripts without any arguments. - writeRead.py: Allows sending commands to and receiving commands from a pmac controller over an ethernet connection. -- analyzeTcpDump: This script takes a tcpdump as an input and +- analyzeTcpDump.py: Parse the TCP communication between an IOC and a MCU and format it into a dictionary. "demo.py" shows how this data can be easily visualized for analysis. + ## Developer guide diff --git a/utils/analyzeTcpDump/analyzeTcpDump.py b/utils/analyzeTcpDump/analyzeTcpDump.py new file mode 100755 index 0000000..701bbd0 --- /dev/null +++ b/utils/analyzeTcpDump/analyzeTcpDump.py @@ -0,0 +1,200 @@ +#! venv/bin/python3 +""" +This script can be used to format the communication between an EPICS IOC and a +PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump +and rearranging the information in a more structured manner. + +To read the manual, simply run this script without any arguments. + +Stefan Mathis, January 2025 +""" + +from scapy.all import * +import json +import re +import codecs +import os +from datetime import datetime + +def parse(fileAndPath): + + try: + scapyCap = PcapReader(fileAndPath) + except: + print(f"Could not read file {fileAndPath} as PCAP file") + + requests = [] + sent = [] + + jsonDict = dict() + lastLayer = None + for packet in scapyCap: + + layer = packet.getlayer(Raw) + + if layer is None: + continue + + # Skip the package if it is not a command or a response. A command ends + # with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06) + last = layer.load[-1] + if last == 6: + isResponse = True + elif last == 13: + isResponse = False + else: + continue + + # Store the info by the IP adress of the MCU + if isResponse: + ip = packet[IP].src + else: + ip = packet[IP].dst + + if ip not in jsonDict: + jsonDict[ip] = dict() + + # Convert to ASCII + ascii = layer.load.decode("unicode_escape") + + # Convert the time to a float + time = float(packet.time) + + if isResponse: + + # A response is always a number followed by a carriage return + responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii) + + # Check if the number of responses matches the number of requests + valid = len(responses) == len(requests) + + # Pair up the request-response pairs + for (request, response) in zip(requests, responses): + if request not in jsonDict[ip]: + jsonDict[ip][request] = dict() + + if "." in response: + value = float(response) + else: + value = int(response) + + lastLayer = lastPacket.getlayer(Raw) + lastTime = int(lastPacket.time) + data = { + 'command': { + 'hex': [format(value, '02x') for value in lastLayer.load], + 'ascii': lastLayer.load.decode("unicode_escape"), + 'timestamp': lastTime, + 'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()), + }, + 'response': { + 'hex': [format(value, '02x') for value in layer.load], + 'ascii': ascii, + 'value': value, + 'timestamp': time, + 'timeiso': str(datetime.fromtimestamp(time).isoformat()), + 'valid': valid + } + } + jsonDict[ip][request][time] = data + else: + requests.clear() + sent.clear() + + # Store the packet for use in the response iteration + lastPacket = packet + + # Parse the ASCII text via regex. A PMAC command usually has the + # format LDDDD(=), where L is a capital letter, the first + # two digits D are the axis number and the last two digits together + # with the letter form the command. + + # Separate the commands into sent data (e.g. setting a position) + # and data requests (e.g. reading the axis status). Sent data always + # has an equal sign. + for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii): + if "=" in command: + sent.append(command) + else: + requests.append(command) + + # Store the sent. The requests yfd stored together with the responses later. + for command in sent: + splitted = command.split("=") + key = splitted[0] + key = key + "=" + if key not in jsonDict[ip]: + jsonDict[ip][key] = dict() + + if "." in splitted[1]: + value = float(splitted[1]) + else: + value = int(splitted[1]) + + data = { + 'command': { + 'hex': [format(value, '02x') for value in layer.load], + 'ascii': ascii, + 'value': value, + 'timestamp': time, + 'timeiso': str(datetime.fromtimestamp(time).isoformat()), + }, + } + + jsonDict[ip][key][time] = data + + return jsonDict + + +if __name__ == "__main__": + + isInstalled = False + try: + from scapy.all import * + isInstalled = True + + except ImportError: + print("This script needs the Scapy package to run. In order to install a " + "suitable virtual environment, use the 'makevenv' script.") + + if isInstalled: + from sys import argv + + if len(argv) < 2: + print(""" +This script can be used to format the communication between an EPICS IOC and a +PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump +and rearranging the information in a more structured manner. + +After a successfull parse run, the resulting JSON data looks like this: + + (e.g. Q0100 to request the position of axis 1) + + Command + + (e.g. P0100) + + Response + + + (e.g. Q0100= to set the position of axis 1) + + + + + + """) + + else: + for fileAndPath in argv[1:]: + jsonDict = parse(fileAndPath) + + # Save the dict into a JSON + fileName = os.path.basename(fileAndPath) + jsonfileAndPath = f"{fileName}.json" + with open(jsonfileAndPath, 'w') as fp: + json.dump(jsonDict, fp, indent=4) + + print(f"Stored parse result of {fileAndPath} in {fileName}") diff --git a/utils/analyzeTcpDump/demo.pcap b/utils/analyzeTcpDump/demo.pcap new file mode 100644 index 0000000..62bed7f Binary files /dev/null and b/utils/analyzeTcpDump/demo.pcap differ diff --git a/utils/analyzeTcpDump/demo.py b/utils/analyzeTcpDump/demo.py new file mode 100755 index 0000000..fabe07b --- /dev/null +++ b/utils/analyzeTcpDump/demo.py @@ -0,0 +1,83 @@ +#! demovenv/bin/python3 +""" +This demo script shows how the "parse" function of "analyzeTcpDump.py" can be +used to easily visualize data from a PCAP file created by the tcpdump tool / +wireshark. A suitable virtual environment can be created with the "makedemovenv" +script. + +Stefan Mathis, January 2025 +""" + +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from datetime import datetime, timedelta + +from analyzeTcpDump import parse + +if __name__ == "__main__": + + data = parse("demo.pcap") + + plt.figure(figsize=(12, 6)) + + # Plot the position of axis 5 over time + + # Actual position + position_valid = [] + dates_valid = [] + position_all = [] + dates_all = [] + + for (timestamp, item) in data["172.28.101.24"]["Q0510"].items(): + date = datetime.fromtimestamp(timestamp) + value = item["response"]["value"] + + dates_all.append(date) + position_all.append(value) + + if item["response"]["valid"]: + dates_valid.append(date) + position_valid.append(value) + else: + command = item["command"]["ascii"] + response = item["response"]["ascii"] + + # Replace non-renderable characters + command = command.replace("\0", "\\x00") + command = command.replace("\r", "\\x0d") + command = command.replace("\x12", "\\x12") + response = response.replace("\r", "\\x0d") + response = response.replace("\06", "\\x06") + + # Shift the text a bit to the right + plt.text(date, value, f"Command: {command}\nResponse: {response}", horizontalalignment="right", verticalalignment="top") + + # Target position + position_target = [position_valid[0]] + dates_target = [dates_valid[0]] + + for (timestamp, item) in data["172.28.101.24"]["Q0501="].items(): + date = datetime.fromtimestamp(timestamp) + value = item["command"]["value"] + + dates_target.append(date) + position_target.append(position_target[-1]) + + dates_target.append(date) + position_target.append(value) + + dates_target.append(dates_valid[-1]) + position_target.append(position_target[-1]) + + plt.plot(dates_target, position_target, "k--", label="Target position") + plt.plot(dates_all, position_all, "r", label="All responses") + plt.plot(dates_valid, position_valid, "b", label="Valid responses") + plt.xlabel("Time (ISO 8601)") + plt.ylabel("Axis position in degree") + plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%dT%H:%M:%S")) + plt.xticks(rotation=45) + plt.grid(True) + plt.legend(loc="lower left") + plt.title("Position of axis 5") + plt.tight_layout() + plt.show() diff --git a/utils/analyzeTcpDump/makedemovenv b/utils/analyzeTcpDump/makedemovenv new file mode 100755 index 0000000..2fbb311 --- /dev/null +++ b/utils/analyzeTcpDump/makedemovenv @@ -0,0 +1,22 @@ +#!/bin/bash +#------------------------------------------------------------------------- +# Script which installs a virtual environment for PCAP file parsing +# +# Stefan Mathis, September 2024 +#------------------------------------------------------------------------- + +# Remove any previous testing environment +if [ -d "demovenv" ]; then + rm -r demovenv +fi + +/usr/bin/python3.11 -m venv demovenv + +source demovenv/bin/activate + +pip install --upgrade pip +pip install "scapy>=2.5,<3.0" +pip install matplotlib + +# Exit the virtual environment +exit \ No newline at end of file diff --git a/utils/analyzeTcpDump/makevenv b/utils/analyzeTcpDump/makevenv new file mode 100755 index 0000000..1e71296 --- /dev/null +++ b/utils/analyzeTcpDump/makevenv @@ -0,0 +1,21 @@ +#!/bin/bash +#------------------------------------------------------------------------- +# Script which installs a virtual environment for PCAP file parsing +# +# Stefan Mathis, September 2024 +#------------------------------------------------------------------------- + +# Remove any previous testing environment +if [ -d "venv" ]; then + rm -r venv +fi + +/usr/bin/python3.11 -m venv venv + +source venv/bin/activate + +pip install --upgrade pip +pip install "scapy>=2.5,<3.0" + +# Exit the virtual environment +exit \ No newline at end of file diff --git a/utils/writeRead.py b/utils/writeRead.py new file mode 100644 index 0000000..34eef0a --- /dev/null +++ b/utils/writeRead.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +""" +This script allows direct interaction with a pmac-Controller over an ethernet connection. +To read the manual, simply run this script without any arguments. + +Stefan Mathis, December 2024 +""" + +import struct +import socket +import curses + +def packPmacCommand(command): + # 0x40 = VR_DOWNLOAD + # 0xBF = VR_PMAC_GETRESPONSE + buf = struct.pack('BBHHH',0x40,0xBF,0,0,socket.htons(len(command))) + buf = buf + bytes(command,'utf-8') + return buf + +def readPmacReply(input): + msg = bytearray() + expectAck = True + while True: + b = input.recv(1) + bint = int.from_bytes(b,byteorder='little') + if bint == 2 or bint == 7: #STX or BELL + expectAck = False + continue + if expectAck and bint == 6: # ACK + return bytes(msg) + else: + if bint == 13 and not expectAck: # CR + return bytes(msg) + else: + msg.append(bint) + +if __name__ == "__main__": + from sys import argv + + try: + + addr = argv[1].split(':') + s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) + s.connect((addr[0],int(addr[1]))) + + if len(argv) == 3: + buf = packPmacCommand(argv[2]) + s.send(buf) + reply = readPmacReply(s) + print(reply.decode('utf-8') + '\n') + + else: + + try: + + stdscr = curses.initscr() + curses.noecho() + curses.cbreak() + stdscr.keypad(True) + stdscr.scrollok(True) + + stdscr.addstr(">> ") + stdscr.refresh() + + history = [""] + ptr = len(history) - 1 + + while True: + c = stdscr.getch() + if c == curses.KEY_RIGHT: + (y, x) = stdscr.getyx() + if x < len(history[ptr]) + 3: + stdscr.move(y, x+1) + stdscr.refresh() + elif c == curses.KEY_LEFT: + (y, x) = stdscr.getyx() + if x > 3: + stdscr.move(y, x-1) + stdscr.refresh() + elif c == curses.KEY_UP: + if ptr > 0: + ptr -= 1 + stdscr.addch("\r") + stdscr.clrtoeol() + stdscr.addstr(">> " + history[ptr]) + elif c == curses.KEY_DOWN: + if ptr < len(history) - 1: + ptr += 1 + stdscr.addch("\r") + stdscr.clrtoeol() + stdscr.addstr(">> " + history[ptr]) + elif c == curses.KEY_ENTER or c == ord('\n') or c == ord('\r'): + if history[ptr] == 'quit': + break + + # because of arrow keys move back to the end of the line + (y, x) = stdscr.getyx() + stdscr.move(y, 3+len(history[ptr])) + + if history[ptr]: + buf = packPmacCommand(history[ptr]) + s.send(buf) + reply = readPmacReply(s) + stdscr.addstr("\n" + reply.decode('utf-8')[0:-1]) + + if ptr == len(history) - 1 and history[ptr] != "": + history += [""] + else: + history[-1] = "" + ptr = len(history) - 1 + + stdscr.addstr("\n>> ") + stdscr.refresh() + + else: + if ptr < len(history) - 1: # Modifying previous input + if len(history[-1]) == 0: + history[-1] = history[ptr] + ptr = len(history) - 1 + + else: + history += [history[ptr]] + ptr = len(history) - 1 + + if c == curses.KEY_BACKSPACE: + if len(history[ptr]) == 0: + continue + (y, x) = stdscr.getyx() + history[ptr] = history[ptr][0:x-4] + history[ptr][x-3:] + stdscr.addch("\r") + stdscr.clrtoeol() + stdscr.addstr(">> " + history[ptr]) + stdscr.move(y, x-1) + stdscr.refresh() + + else: + (y, x) = stdscr.getyx() + history[ptr] = history[ptr][0:x-3] + chr(c) + history[ptr][x-3:] + stdscr.addch("\r") + stdscr.clrtoeol() + stdscr.addstr(">> " + history[ptr]) + stdscr.move(y, x+1) + stdscr.refresh() + + finally: + + # to quit + curses.nocbreak() + stdscr.keypad(False) + curses.echo() + curses.endwin() + + except: + print(""" + Invalid Arguments + + Option 1: Single Command + ------------------------ + + Usage: writeRead.py pmachost:port command + This then returns the response for command. + + Option 2: CLI Mode + ------------------ + + Usage: writeRead.py pmachost:port + + You can then type in a command, hit enter, and the response will see + the reponse, before being prompted to again enter a command. Type + 'quit' to close prompt. + """) +