202 lines
6.6 KiB
Python
Executable File
202 lines
6.6 KiB
Python
Executable File
#! venv/bin/python3
|
|
"""
|
|
This script can be used to format the communication between an EPICS IOC and a
|
|
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
|
|
and rearranging the information in a more structured manner.
|
|
|
|
To read the manual, simply run this script without any arguments.
|
|
|
|
Stefan Mathis, January 2025
|
|
"""
|
|
|
|
from scapy.all import *
|
|
import json
|
|
import re
|
|
import codecs
|
|
import os
|
|
from datetime import datetime
|
|
|
|
def parse(fileAndPath):
|
|
|
|
try:
|
|
scapyCap = PcapReader(fileAndPath)
|
|
except:
|
|
print(f"Could not read file {fileAndPath} as PCAP file")
|
|
|
|
requests = []
|
|
sent = []
|
|
|
|
jsonDict = dict()
|
|
lastLayer = None
|
|
for packet in scapyCap:
|
|
|
|
layer = packet.getlayer(Raw)
|
|
|
|
if layer is None:
|
|
continue
|
|
|
|
# Skip the package if it is not a command or a response. A command ends
|
|
# with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06)
|
|
last = layer.load[-1]
|
|
if last == 6:
|
|
isResponse = True
|
|
elif last == 13:
|
|
isResponse = False
|
|
else:
|
|
continue
|
|
|
|
# Store the info by the IP adress of the MCU
|
|
if isResponse:
|
|
ip = packet[IP].src
|
|
else:
|
|
ip = packet[IP].dst
|
|
|
|
if ip not in jsonDict:
|
|
jsonDict[ip] = dict()
|
|
|
|
# Convert to ASCII
|
|
ascii = layer.load.decode("unicode_escape")
|
|
|
|
# Convert the time to a float
|
|
time = float(packet.time)
|
|
|
|
if isResponse:
|
|
|
|
# A response is always a number followed by a carriage return
|
|
responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii)
|
|
|
|
# Check if the number of responses matches the number of requests
|
|
valid = len(responses) == len(requests)
|
|
|
|
# Pair up the request-response pairs
|
|
for (request, response) in zip(requests, responses):
|
|
if request not in jsonDict[ip]:
|
|
jsonDict[ip][request] = dict()
|
|
|
|
if "." in response:
|
|
value = float(response)
|
|
else:
|
|
value = int(response)
|
|
|
|
lastLayer = lastPacket.getlayer(Raw)
|
|
lastTime = float(lastPacket.time)
|
|
data = {
|
|
'command': {
|
|
'hex': [format(value, '02x') for value in lastLayer.load],
|
|
'ascii': lastLayer.load.decode("unicode_escape"),
|
|
'timestamp': lastTime,
|
|
'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()),
|
|
},
|
|
'response': {
|
|
'hex': [format(value, '02x') for value in layer.load],
|
|
'ascii': ascii,
|
|
'value': value,
|
|
'timestamp': time,
|
|
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
|
|
'valid': valid
|
|
}
|
|
}
|
|
jsonDict[ip][request][time] = data
|
|
else:
|
|
requests.clear()
|
|
sent.clear()
|
|
|
|
# Store the packet for use in the response iteration
|
|
lastPacket = packet
|
|
|
|
# Parse the ASCII text via regex. A PMAC command usually has the
|
|
# format LDDDD(=<Number>), where L is a capital letter, the first
|
|
# two digits D are the axis number and the last two digits together
|
|
# with the letter form the command.
|
|
|
|
# Separate the commands into sent data (e.g. setting a position)
|
|
# and data requests (e.g. reading the axis status). Sent data always
|
|
# has an equal sign.
|
|
for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii):
|
|
if "=" in command:
|
|
sent.append(command)
|
|
else:
|
|
requests.append(command)
|
|
|
|
# Store the sent. The requests yfd stored together with the responses later.
|
|
for command in sent:
|
|
splitted = command.split("=")
|
|
key = splitted[0]
|
|
key = key + "="
|
|
if key not in jsonDict[ip]:
|
|
jsonDict[ip][key] = dict()
|
|
|
|
if "." in splitted[1]:
|
|
value = float(splitted[1])
|
|
else:
|
|
value = int(splitted[1])
|
|
|
|
data = {
|
|
'command': {
|
|
'hex': [format(value, '02x') for value in layer.load],
|
|
'ascii': ascii,
|
|
'value': value,
|
|
'timestamp': time,
|
|
'timeiso': str(datetime.fromtimestamp(time).isoformat()),
|
|
},
|
|
}
|
|
|
|
jsonDict[ip][key][time] = data
|
|
|
|
return jsonDict
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
isInstalled = False
|
|
try:
|
|
from scapy.all import *
|
|
isInstalled = True
|
|
|
|
except ImportError:
|
|
print("This script needs the Scapy package to run. In order to install a "
|
|
"suitable virtual environment, use the 'makevenv' script.")
|
|
|
|
if isInstalled:
|
|
from sys import argv
|
|
|
|
if len(argv) < 2:
|
|
print("""
|
|
This script can be used to format the communication between an EPICS IOC and a
|
|
PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump
|
|
and rearranging the information in a more structured manner.
|
|
|
|
After a successfull parse run, the resulting JSON data looks like this:
|
|
<IP Adress MCU1>
|
|
<Request command type> (e.g. Q0100 to request the position of axis 1)
|
|
<Event timestamp>
|
|
Command
|
|
<Raw ASCII string>
|
|
<Actual command> (e.g. P0100)
|
|
<Timestamp in Epoch>
|
|
Response
|
|
<Raw ASCII string>
|
|
<Actual response (e.g. -3)
|
|
<Timestamp in Epoch>
|
|
<Set command type> (e.g. Q0100= to set the position of axis 1)
|
|
<Event timestamp>
|
|
Command
|
|
<Raw ASCII string>
|
|
<Actual command (e.g. P0100)
|
|
<Set value>
|
|
<Timestamp in Epoch>
|
|
<IP Adress MCU2>
|
|
""")
|
|
|
|
else:
|
|
for fileAndPath in argv[1:]:
|
|
jsonDict = parse(fileAndPath)
|
|
|
|
# Save the dict into a JSON
|
|
fileName = os.path.basename(fileAndPath)
|
|
jsonfileAndPath = f"{fileName}.json"
|
|
with open(jsonfileAndPath, 'w') as fp:
|
|
json.dump(jsonDict, fp, indent=4)
|
|
|
|
print(f"Stored parse result of {fileAndPath} in {fileName}")
|