Files
turboPmac/utils/analyzeTcpDump/analyzeTcpDump.py

202 lines
6.6 KiB
Python
Executable File

#! venv/bin/python3
"""
This script can be used to format the communication between an EPICS IOC and a
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
and rearranging the information in a more structured manner.
To read the manual, simply run this script without any arguments.
Stefan Mathis, January 2025
"""
from scapy.all import *
import json
import re
import codecs
import os
from datetime import datetime
def parse(fileAndPath):
try:
scapyCap = PcapReader(fileAndPath)
except:
print(f"Could not read file {fileAndPath} as PCAP file")
requests = []
sent = []
jsonDict = dict()
lastLayer = None
for packet in scapyCap:
layer = packet.getlayer(Raw)
if layer is None:
continue
# Skip the package if it is not a command or a response. A command ends
# with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06)
last = layer.load[-1]
if last == 6:
isResponse = True
elif last == 13:
isResponse = False
else:
continue
# Store the info by the IP adress of the MCU
if isResponse:
ip = packet[IP].src
else:
ip = packet[IP].dst
if ip not in jsonDict:
jsonDict[ip] = dict()
# Convert to ASCII
ascii = layer.load.decode("unicode_escape")
# Convert the time to a float
time = float(packet.time)
if isResponse:
# A response is always a number followed by a carriage return
responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii)
# Check if the number of responses matches the number of requests
valid = len(responses) == len(requests)
# Pair up the request-response pairs
for (request, response) in zip(requests, responses):
if request not in jsonDict[ip]:
jsonDict[ip][request] = dict()
if "." in response:
value = float(response)
else:
value = int(response)
lastLayer = lastPacket.getlayer(Raw)
lastTime = float(lastPacket.time)
data = {
'command': {
'hex': [format(value, '02x') for value in lastLayer.load],
'ascii': lastLayer.load.decode("unicode_escape"),
'timestamp': lastTime,
'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()),
},
'response': {
'hex': [format(value, '02x') for value in layer.load],
'ascii': ascii,
'value': value,
'timestamp': time,
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
'valid': valid
}
}
jsonDict[ip][request][time] = data
else:
requests.clear()
sent.clear()
# Store the packet for use in the response iteration
lastPacket = packet
# Parse the ASCII text via regex. A PMAC command usually has the
# format LDDDD(=<Number>), where L is a capital letter, the first
# two digits D are the axis number and the last two digits together
# with the letter form the command.
# Separate the commands into sent data (e.g. setting a position)
# and data requests (e.g. reading the axis status). Sent data always
# has an equal sign.
for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii):
if "=" in command:
sent.append(command)
else:
requests.append(command)
# Store the sent. The requests yfd stored together with the responses later.
for command in sent:
splitted = command.split("=")
key = splitted[0]
key = key + "="
if key not in jsonDict[ip]:
jsonDict[ip][key] = dict()
if "." in splitted[1]:
value = float(splitted[1])
else:
value = int(splitted[1])
data = {
'command': {
'hex': [format(value, '02x') for value in layer.load],
'ascii': ascii,
'value': value,
'timestamp': time,
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
},
}
jsonDict[ip][key][time] = data
return jsonDict
if __name__ == "__main__":
isInstalled = False
try:
from scapy.all import *
isInstalled = True
except ImportError:
print("This script needs the Scapy package to run. In order to install a "
"suitable virtual environment, use the 'makevenv' script.")
if isInstalled:
from sys import argv
if len(argv) < 2:
print("""
This script can be used to format the communication between an EPICS IOC and a
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
and rearranging the information in a more structured manner.
After a successfull parse run, the resulting JSON data looks like this:
<IP Adress MCU1>
<Request command type> (e.g. Q0100 to request the position of axis 1)
<Event timestamp>
Command
<Raw ASCII string>
<Actual command> (e.g. P0100)
<Timestamp in Epoch>
Response
<Raw ASCII string>
<Actual response (e.g. -3)
<Timestamp in Epoch>
<Set command type> (e.g. Q0100= to set the position of axis 1)
<Event timestamp>
Command
<Raw ASCII string>
<Actual command (e.g. P0100)
<Set value>
<Timestamp in Epoch>
<IP Adress MCU2>
""")
else:
for fileAndPath in argv[1:]:
jsonDict = parse(fileAndPath)
# Save the dict into a JSON
fileName = os.path.basename(fileAndPath)
jsonfileAndPath = f"{fileName}.json"
with open(jsonfileAndPath, 'w') as fp:
json.dump(jsonDict, fp, indent=4)
print(f"Stored parse result of {fileAndPath} in {fileName}")