Added two utility scripts for working with PMAC motors
This commit is contained in:
200
utils/analyzeTcpDump/analyzeTcpDump.py
Executable file
200
utils/analyzeTcpDump/analyzeTcpDump.py
Executable file
@@ -0,0 +1,200 @@
|
||||
#! venv/bin/python3
|
||||
"""
|
||||
This script can be used to format the communication between an EPICS IOC and a
|
||||
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
|
||||
and rearranging the information in a more structured manner.
|
||||
|
||||
To read the manual, simply run this script without any arguments.
|
||||
|
||||
Stefan Mathis, January 2025
|
||||
"""
|
||||
|
||||
from scapy.all import *
|
||||
import json
|
||||
import re
|
||||
import codecs
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
def parse(fileAndPath):
|
||||
|
||||
try:
|
||||
scapyCap = PcapReader(fileAndPath)
|
||||
except:
|
||||
print(f"Could not read file {fileAndPath} as PCAP file")
|
||||
|
||||
requests = []
|
||||
sent = []
|
||||
|
||||
jsonDict = dict()
|
||||
lastLayer = None
|
||||
for packet in scapyCap:
|
||||
|
||||
layer = packet.getlayer(Raw)
|
||||
|
||||
if layer is None:
|
||||
continue
|
||||
|
||||
# Skip the package if it is not a command or a response. A command ends
|
||||
# with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06)
|
||||
last = layer.load[-1]
|
||||
if last == 6:
|
||||
isResponse = True
|
||||
elif last == 13:
|
||||
isResponse = False
|
||||
else:
|
||||
continue
|
||||
|
||||
# Store the info by the IP adress of the MCU
|
||||
if isResponse:
|
||||
ip = packet[IP].src
|
||||
else:
|
||||
ip = packet[IP].dst
|
||||
|
||||
if ip not in jsonDict:
|
||||
jsonDict[ip] = dict()
|
||||
|
||||
# Convert to ASCII
|
||||
ascii = layer.load.decode("unicode_escape")
|
||||
|
||||
# Convert the time to a float
|
||||
time = float(packet.time)
|
||||
|
||||
if isResponse:
|
||||
|
||||
# A response is always a number followed by a carriage return
|
||||
responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii)
|
||||
|
||||
# Check if the number of responses matches the number of requests
|
||||
valid = len(responses) == len(requests)
|
||||
|
||||
# Pair up the request-response pairs
|
||||
for (request, response) in zip(requests, responses):
|
||||
if request not in jsonDict[ip]:
|
||||
jsonDict[ip][request] = dict()
|
||||
|
||||
if "." in response:
|
||||
value = float(response)
|
||||
else:
|
||||
value = int(response)
|
||||
|
||||
lastLayer = lastPacket.getlayer(Raw)
|
||||
lastTime = int(lastPacket.time)
|
||||
data = {
|
||||
'command': {
|
||||
'hex': [format(value, '02x') for value in lastLayer.load],
|
||||
'ascii': lastLayer.load.decode("unicode_escape"),
|
||||
'timestamp': lastTime,
|
||||
'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()),
|
||||
},
|
||||
'response': {
|
||||
'hex': [format(value, '02x') for value in layer.load],
|
||||
'ascii': ascii,
|
||||
'value': value,
|
||||
'timestamp': time,
|
||||
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
|
||||
'valid': valid
|
||||
}
|
||||
}
|
||||
jsonDict[ip][request][time] = data
|
||||
else:
|
||||
requests.clear()
|
||||
sent.clear()
|
||||
|
||||
# Store the packet for use in the response iteration
|
||||
lastPacket = packet
|
||||
|
||||
# Parse the ASCII text via regex. A PMAC command usually has the
|
||||
# format LDDDD(=<Number>), where L is a capital letter, the first
|
||||
# two digits D are the axis number and the last two digits together
|
||||
# with the letter form the command.
|
||||
|
||||
# Separate the commands into sent data (e.g. setting a position)
|
||||
# and data requests (e.g. reading the axis status). Sent data always
|
||||
# has an equal sign.
|
||||
for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii):
|
||||
if "=" in command:
|
||||
sent.append(command)
|
||||
else:
|
||||
requests.append(command)
|
||||
|
||||
# Store the sent. The requests yfd stored together with the responses later.
|
||||
for command in sent:
|
||||
splitted = command.split("=")
|
||||
key = splitted[0]
|
||||
key = key + "="
|
||||
if key not in jsonDict[ip]:
|
||||
jsonDict[ip][key] = dict()
|
||||
|
||||
if "." in splitted[1]:
|
||||
value = float(splitted[1])
|
||||
else:
|
||||
value = int(splitted[1])
|
||||
|
||||
data = {
|
||||
'command': {
|
||||
'hex': [format(value, '02x') for value in layer.load],
|
||||
'ascii': ascii,
|
||||
'value': value,
|
||||
'timestamp': time,
|
||||
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
|
||||
},
|
||||
}
|
||||
|
||||
jsonDict[ip][key][time] = data
|
||||
|
||||
return jsonDict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
isInstalled = False
|
||||
try:
|
||||
from scapy.all import *
|
||||
isInstalled = True
|
||||
|
||||
except ImportError:
|
||||
print("This script needs the Scapy package to run. In order to install a "
|
||||
"suitable virtual environment, use the 'makevenv' script.")
|
||||
|
||||
if isInstalled:
|
||||
from sys import argv
|
||||
|
||||
if len(argv) < 2:
|
||||
print("""
|
||||
This script can be used to format the communication between an EPICS IOC and a
|
||||
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
|
||||
and rearranging the information in a more structured manner.
|
||||
|
||||
After a successfull parse run, the resulting JSON data looks like this:
|
||||
<IP Adress MCU1>
|
||||
<Request command type> (e.g. Q0100 to request the position of axis 1)
|
||||
<Event timestamp>
|
||||
Command
|
||||
<Raw ASCII string>
|
||||
<Actual command> (e.g. P0100)
|
||||
<Timestamp in Epoch>
|
||||
Response
|
||||
<Raw ASCII string>
|
||||
<Actual response (e.g. -3)
|
||||
<Timestamp in Epoch>
|
||||
<Set command type> (e.g. Q0100= to set the position of axis 1)
|
||||
<Event timestamp>
|
||||
<Raw ASCII string>
|
||||
<Actual command (e.g. P0100)
|
||||
<Set value>
|
||||
<Timestamp in Epoch>
|
||||
<IP Adress MCU2>
|
||||
""")
|
||||
|
||||
else:
|
||||
for fileAndPath in argv[1:]:
|
||||
jsonDict = parse(fileAndPath)
|
||||
|
||||
# Save the dict into a JSON
|
||||
fileName = os.path.basename(fileAndPath)
|
||||
jsonfileAndPath = f"{fileName}.json"
|
||||
with open(jsonfileAndPath, 'w') as fp:
|
||||
json.dump(jsonDict, fp, indent=4)
|
||||
|
||||
print(f"Stored parse result of {fileAndPath} in {fileName}")
|
||||
Reference in New Issue
Block a user