Compare commits

...

1 Commits
0.4.3 ... 0.5.0

Author SHA1 Message Date
83051e10c3 Added two utility scripts for working with PMAC motors 2025-01-09 13:26:12 +01:00
7 changed files with 501 additions and 2 deletions

View File

@ -8,9 +8,10 @@ This is a driver for the pmacV3 motion controller with the SINQ communication pr
This driver is a standard sinqMotor-derived driver and does not need any specific configuration. For the general configuration, please see https://git.psi.ch/sinq-epics-modules/sinqmotor/-/blob/main/README.md.
The folder "utils" contains utility scripts for working with pmac motor controllers:
The folder "utils" contains utility scripts for working with pmac motor controllers. To read their manual, run the scripts without any arguments.
- writeRead.py: Allows sending commands to and receiving commands from a pmac controller over an ethernet connection.
- analyzeTcpDump: This script takes a tcpdump as an input and
- analyzeTcpDump.py: Parse the TCP communication between an IOC and a MCU and format it into a dictionary. "demo.py" shows how this data can be easily visualized for analysis.
## Developer guide

View File

@ -0,0 +1,200 @@
#! venv/bin/python3
"""
This script can be used to format the communication between an EPICS IOC and a
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
and rearranging the information in a more structured manner.
To read the manual, simply run this script without any arguments.
Stefan Mathis, January 2025
"""
from scapy.all import *
import json
import re
import codecs
import os
from datetime import datetime
def parse(fileAndPath):
try:
scapyCap = PcapReader(fileAndPath)
except:
print(f"Could not read file {fileAndPath} as PCAP file")
requests = []
sent = []
jsonDict = dict()
lastLayer = None
for packet in scapyCap:
layer = packet.getlayer(Raw)
if layer is None:
continue
# Skip the package if it is not a command or a response. A command ends
# with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06)
last = layer.load[-1]
if last == 6:
isResponse = True
elif last == 13:
isResponse = False
else:
continue
# Store the info by the IP adress of the MCU
if isResponse:
ip = packet[IP].src
else:
ip = packet[IP].dst
if ip not in jsonDict:
jsonDict[ip] = dict()
# Convert to ASCII
ascii = layer.load.decode("unicode_escape")
# Convert the time to a float
time = float(packet.time)
if isResponse:
# A response is always a number followed by a carriage return
responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii)
# Check if the number of responses matches the number of requests
valid = len(responses) == len(requests)
# Pair up the request-response pairs
for (request, response) in zip(requests, responses):
if request not in jsonDict[ip]:
jsonDict[ip][request] = dict()
if "." in response:
value = float(response)
else:
value = int(response)
lastLayer = lastPacket.getlayer(Raw)
lastTime = int(lastPacket.time)
data = {
'command': {
'hex': [format(value, '02x') for value in lastLayer.load],
'ascii': lastLayer.load.decode("unicode_escape"),
'timestamp': lastTime,
'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()),
},
'response': {
'hex': [format(value, '02x') for value in layer.load],
'ascii': ascii,
'value': value,
'timestamp': time,
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
'valid': valid
}
}
jsonDict[ip][request][time] = data
else:
requests.clear()
sent.clear()
# Store the packet for use in the response iteration
lastPacket = packet
# Parse the ASCII text via regex. A PMAC command usually has the
# format LDDDD(=<Number>), where L is a capital letter, the first
# two digits D are the axis number and the last two digits together
# with the letter form the command.
# Separate the commands into sent data (e.g. setting a position)
# and data requests (e.g. reading the axis status). Sent data always
# has an equal sign.
for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii):
if "=" in command:
sent.append(command)
else:
requests.append(command)
# Store the sent. The requests yfd stored together with the responses later.
for command in sent:
splitted = command.split("=")
key = splitted[0]
key = key + "="
if key not in jsonDict[ip]:
jsonDict[ip][key] = dict()
if "." in splitted[1]:
value = float(splitted[1])
else:
value = int(splitted[1])
data = {
'command': {
'hex': [format(value, '02x') for value in layer.load],
'ascii': ascii,
'value': value,
'timestamp': time,
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
},
}
jsonDict[ip][key][time] = data
return jsonDict
if __name__ == "__main__":
isInstalled = False
try:
from scapy.all import *
isInstalled = True
except ImportError:
print("This script needs the Scapy package to run. In order to install a "
"suitable virtual environment, use the 'makevenv' script.")
if isInstalled:
from sys import argv
if len(argv) < 2:
print("""
This script can be used to format the communication between an EPICS IOC and a
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
and rearranging the information in a more structured manner.
After a successfull parse run, the resulting JSON data looks like this:
<IP Adress MCU1>
<Request command type> (e.g. Q0100 to request the position of axis 1)
<Event timestamp>
Command
<Raw ASCII string>
<Actual command> (e.g. P0100)
<Timestamp in Epoch>
Response
<Raw ASCII string>
<Actual response (e.g. -3)
<Timestamp in Epoch>
<Set command type> (e.g. Q0100= to set the position of axis 1)
<Event timestamp>
<Raw ASCII string>
<Actual command (e.g. P0100)
<Set value>
<Timestamp in Epoch>
<IP Adress MCU2>
""")
else:
for fileAndPath in argv[1:]:
jsonDict = parse(fileAndPath)
# Save the dict into a JSON
fileName = os.path.basename(fileAndPath)
jsonfileAndPath = f"{fileName}.json"
with open(jsonfileAndPath, 'w') as fp:
json.dump(jsonDict, fp, indent=4)
print(f"Stored parse result of {fileAndPath} in {fileName}")

Binary file not shown.

83
utils/analyzeTcpDump/demo.py Executable file
View File

@ -0,0 +1,83 @@
#! demovenv/bin/python3
"""
This demo script shows how the "parse" function of "analyzeTcpDump.py" can be
used to easily visualize data from a PCAP file created by the tcpdump tool /
wireshark. A suitable virtual environment can be created with the "makedemovenv"
script.
Stefan Mathis, January 2025
"""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
from analyzeTcpDump import parse
if __name__ == "__main__":
data = parse("demo.pcap")
plt.figure(figsize=(12, 6))
# Plot the position of axis 5 over time
# Actual position
position_valid = []
dates_valid = []
position_all = []
dates_all = []
for (timestamp, item) in data["172.28.101.24"]["Q0510"].items():
date = datetime.fromtimestamp(timestamp)
value = item["response"]["value"]
dates_all.append(date)
position_all.append(value)
if item["response"]["valid"]:
dates_valid.append(date)
position_valid.append(value)
else:
command = item["command"]["ascii"]
response = item["response"]["ascii"]
# Replace non-renderable characters
command = command.replace("\0", "\\x00")
command = command.replace("\r", "\\x0d")
command = command.replace("\x12", "\\x12")
response = response.replace("\r", "\\x0d")
response = response.replace("\06", "\\x06")
# Shift the text a bit to the right
plt.text(date, value, f"Command: {command}\nResponse: {response}", horizontalalignment="right", verticalalignment="top")
# Target position
position_target = [position_valid[0]]
dates_target = [dates_valid[0]]
for (timestamp, item) in data["172.28.101.24"]["Q0501="].items():
date = datetime.fromtimestamp(timestamp)
value = item["command"]["value"]
dates_target.append(date)
position_target.append(position_target[-1])
dates_target.append(date)
position_target.append(value)
dates_target.append(dates_valid[-1])
position_target.append(position_target[-1])
plt.plot(dates_target, position_target, "k--", label="Target position")
plt.plot(dates_all, position_all, "r", label="All responses")
plt.plot(dates_valid, position_valid, "b", label="Valid responses")
plt.xlabel("Time (ISO 8601)")
plt.ylabel("Axis position in degree")
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%dT%H:%M:%S"))
plt.xticks(rotation=45)
plt.grid(True)
plt.legend(loc="lower left")
plt.title("Position of axis 5")
plt.tight_layout()
plt.show()

View File

@ -0,0 +1,22 @@
#!/bin/bash
#-------------------------------------------------------------------------
# Script which installs a virtual environment for PCAP file parsing
#
# Stefan Mathis, September 2024
#-------------------------------------------------------------------------
# Remove any previous testing environment
if [ -d "demovenv" ]; then
rm -r demovenv
fi
/usr/bin/python3.11 -m venv demovenv
source demovenv/bin/activate
pip install --upgrade pip
pip install "scapy>=2.5,<3.0"
pip install matplotlib
# Exit the virtual environment
exit

21
utils/analyzeTcpDump/makevenv Executable file
View File

@ -0,0 +1,21 @@
#!/bin/bash
#-------------------------------------------------------------------------
# Script which installs a virtual environment for PCAP file parsing
#
# Stefan Mathis, September 2024
#-------------------------------------------------------------------------
# Remove any previous testing environment
if [ -d "venv" ]; then
rm -r venv
fi
/usr/bin/python3.11 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install "scapy>=2.5,<3.0"
# Exit the virtual environment
exit

172
utils/writeRead.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""
This script allows direct interaction with a pmac-Controller over an ethernet connection.
To read the manual, simply run this script without any arguments.
Stefan Mathis, December 2024
"""
import struct
import socket
import curses
def packPmacCommand(command):
# 0x40 = VR_DOWNLOAD
# 0xBF = VR_PMAC_GETRESPONSE
buf = struct.pack('BBHHH',0x40,0xBF,0,0,socket.htons(len(command)))
buf = buf + bytes(command,'utf-8')
return buf
def readPmacReply(input):
msg = bytearray()
expectAck = True
while True:
b = input.recv(1)
bint = int.from_bytes(b,byteorder='little')
if bint == 2 or bint == 7: #STX or BELL
expectAck = False
continue
if expectAck and bint == 6: # ACK
return bytes(msg)
else:
if bint == 13 and not expectAck: # CR
return bytes(msg)
else:
msg.append(bint)
if __name__ == "__main__":
from sys import argv
try:
addr = argv[1].split(':')
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((addr[0],int(addr[1])))
if len(argv) == 3:
buf = packPmacCommand(argv[2])
s.send(buf)
reply = readPmacReply(s)
print(reply.decode('utf-8') + '\n')
else:
try:
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
stdscr.keypad(True)
stdscr.scrollok(True)
stdscr.addstr(">> ")
stdscr.refresh()
history = [""]
ptr = len(history) - 1
while True:
c = stdscr.getch()
if c == curses.KEY_RIGHT:
(y, x) = stdscr.getyx()
if x < len(history[ptr]) + 3:
stdscr.move(y, x+1)
stdscr.refresh()
elif c == curses.KEY_LEFT:
(y, x) = stdscr.getyx()
if x > 3:
stdscr.move(y, x-1)
stdscr.refresh()
elif c == curses.KEY_UP:
if ptr > 0:
ptr -= 1
stdscr.addch("\r")
stdscr.clrtoeol()
stdscr.addstr(">> " + history[ptr])
elif c == curses.KEY_DOWN:
if ptr < len(history) - 1:
ptr += 1
stdscr.addch("\r")
stdscr.clrtoeol()
stdscr.addstr(">> " + history[ptr])
elif c == curses.KEY_ENTER or c == ord('\n') or c == ord('\r'):
if history[ptr] == 'quit':
break
# because of arrow keys move back to the end of the line
(y, x) = stdscr.getyx()
stdscr.move(y, 3+len(history[ptr]))
if history[ptr]:
buf = packPmacCommand(history[ptr])
s.send(buf)
reply = readPmacReply(s)
stdscr.addstr("\n" + reply.decode('utf-8')[0:-1])
if ptr == len(history) - 1 and history[ptr] != "":
history += [""]
else:
history[-1] = ""
ptr = len(history) - 1
stdscr.addstr("\n>> ")
stdscr.refresh()
else:
if ptr < len(history) - 1: # Modifying previous input
if len(history[-1]) == 0:
history[-1] = history[ptr]
ptr = len(history) - 1
else:
history += [history[ptr]]
ptr = len(history) - 1
if c == curses.KEY_BACKSPACE:
if len(history[ptr]) == 0:
continue
(y, x) = stdscr.getyx()
history[ptr] = history[ptr][0:x-4] + history[ptr][x-3:]
stdscr.addch("\r")
stdscr.clrtoeol()
stdscr.addstr(">> " + history[ptr])
stdscr.move(y, x-1)
stdscr.refresh()
else:
(y, x) = stdscr.getyx()
history[ptr] = history[ptr][0:x-3] + chr(c) + history[ptr][x-3:]
stdscr.addch("\r")
stdscr.clrtoeol()
stdscr.addstr(">> " + history[ptr])
stdscr.move(y, x+1)
stdscr.refresh()
finally:
# to quit
curses.nocbreak()
stdscr.keypad(False)
curses.echo()
curses.endwin()
except:
print("""
Invalid Arguments
Option 1: Single Command
------------------------
Usage: writeRead.py pmachost:port command
This then returns the response for command.
Option 2: CLI Mode
------------------
Usage: writeRead.py pmachost:port
You can then type in a command, hit enter, and the response will see
the reponse, before being prompted to again enter a command. Type
'quit' to close prompt.
""")