#! venv/bin/python3 """ This script can be used to format the communication between an EPICS IOC and a PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump and rearranging the information in a more structured manner. To read the manual, simply run this script without any arguments. Stefan Mathis, January 2025 """ from scapy.all import * import json import re import codecs import os from datetime import datetime def parse(fileAndPath): try: scapyCap = PcapReader(fileAndPath) except: print(f"Could not read file {fileAndPath} as PCAP file") requests = [] sent = [] jsonDict = dict() lastLayer = None for packet in scapyCap: layer = packet.getlayer(Raw) if layer is None: continue # Skip the package if it is not a command or a response. A command ends # with a carriage return (x0d), a response ends with ACKNOWLEDGE (x06) last = layer.load[-1] if last == 6: isResponse = True elif last == 13: isResponse = False else: continue # Store the info by the IP adress of the MCU if isResponse: ip = packet[IP].src else: ip = packet[IP].dst if ip not in jsonDict: jsonDict[ip] = dict() # Convert to ASCII ascii = layer.load.decode("unicode_escape") # Convert the time to a float time = float(packet.time) if isResponse: # A response is always a number followed by a carriage return responses = re.findall("-?\d+\.\d+\r|-?\d+\r", ascii) # Check if the number of responses matches the number of requests valid = len(responses) == len(requests) # Pair up the request-response pairs for (request, response) in zip(requests, responses): if request not in jsonDict[ip]: jsonDict[ip][request] = dict() if "." in response: value = float(response) else: value = int(response) lastLayer = lastPacket.getlayer(Raw) lastTime = float(lastPacket.time) data = { 'command': { 'hex': [format(value, '02x') for value in lastLayer.load], 'ascii': lastLayer.load.decode("unicode_escape"), 'timestamp': lastTime, 'timeiso': str(datetime.fromtimestamp(lastTime).isoformat()), }, 'response': { 'hex': [format(value, '02x') for value in layer.load], 'ascii': ascii, 'value': value, 'timestamp': time, 'timeiso': str(datetime.fromtimestamp(time).isoformat()), 'valid': valid } } jsonDict[ip][request][time] = data else: requests.clear() sent.clear() # Store the packet for use in the response iteration lastPacket = packet # Parse the ASCII text via regex. A PMAC command usually has the # format LDDDD(=), where L is a capital letter, the first # two digits D are the axis number and the last two digits together # with the letter form the command. # Separate the commands into sent data (e.g. setting a position) # and data requests (e.g. reading the axis status). Sent data always # has an equal sign. for command in re.findall("[A-Z]\d+=-?\d+|[A-Z]\d+", ascii): if "=" in command: sent.append(command) else: requests.append(command) # Store the sent. The requests yfd stored together with the responses later. for command in sent: splitted = command.split("=") key = splitted[0] key = key + "=" if key not in jsonDict[ip]: jsonDict[ip][key] = dict() if "." in splitted[1]: value = float(splitted[1]) else: value = int(splitted[1]) data = { 'command': { 'hex': [format(value, '02x') for value in layer.load], 'ascii': ascii, 'value': value, 'timestamp': time, 'timeiso': str(datetime.fromtimestamp(time).isoformat()), }, } jsonDict[ip][key][time] = data return jsonDict if __name__ == "__main__": isInstalled = False try: from scapy.all import * isInstalled = True except ImportError: print("This script needs the Scapy package to run. In order to install a " "suitable virtual environment, use the 'makevenv' script.") if isInstalled: from sys import argv if len(argv) < 2: print(""" This script can be used to format the communication between an EPICS IOC and a PMAC MCU into JSON files. It does this by parsing PCAP files created by tcpdump and rearranging the information in a more structured manner. After a successfull parse run, the resulting JSON data looks like this: (e.g. Q0100 to request the position of axis 1) Command (e.g. P0100) Response (e.g. Q0100= to set the position of axis 1) Command """) else: for fileAndPath in argv[1:]: jsonDict = parse(fileAndPath) # Save the dict into a JSON fileName = os.path.basename(fileAndPath) jsonfileAndPath = f"{fileName}.json" with open(jsonfileAndPath, 'w') as fp: json.dump(jsonDict, fp, indent=4) print(f"Stored parse result of {fileAndPath} in {fileName}")