dumb log reader and example of use

This commit is contained in:
Giovanni Fattori
2024-07-05 14:54:34 +02:00
parent 7be9855737
commit de79a7adba
6 changed files with 373 additions and 0 deletions

View File

@ -0,0 +1,134 @@
# gfattori - 05.07.2024
# dumb reader of logfiles
# Various helpers, largely inspired from log reader used in the gating system (https://git.psi.ch/cpt_bioeng/ots/)
import struct
def id_to_uint32(pcID):
# Initialize a 32-bit value to zero
val = 0
# Calculate the length of the input string
n = len(pcID)
# Consider only the first 4 characters in the input string
if n > 4:
n = 4
# Copy the first n characters to the output 32-bit integer
val_bytes = pcID[:n].encode('utf-8')
val = struct.unpack('!I', val_bytes.ljust(4, b'\0'))[0]
return val
def uint32_to_id(val):
# Pack the 32-bit integer into bytes
val_bytes = struct.pack('!I', val)
# Decode the bytes to a string
pcID = val_bytes.rstrip(b'\0').decode('utf-8')
return pcID
def read_nbo(pCh):
return struct.unpack('!I', pCh[:4])[0]
def bmms_index_bin_block_pos(pcDataBuf):
GenBlockList = []
iMemRunner = 0
while iMemRunner < pcDataBuf.size():
blk = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
'start': iMemRunner,
'len': read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
}
GenBlockList.append(blk)
iMemRunner += blk['len']
return GenBlockList
def bmms_index_bin_entry_pos(pcDataBuf):
GenEntryList = []
iMemRunner = 16
while iMemRunner < len(pcDataBuf)-16:
lenAndTyp = read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
etr = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
'start': iMemRunner,
'len': (lenAndTyp & 0x00FFFFFF),
'typ': hex(lenAndTyp & 0xFF000000)
}
GenEntryList.append(etr)
iMemRunner += etr['len']
return GenEntryList
def bmms_readData_bin_blockList(pcDataBuf, blkIdx):
blkList = []
for mBlk in blkIdx:
etrList = []
for mEtr in mBlk['Entries']:
etrList.append(bmms_bin_entry_genericparser(pcDataBuf[mEtr['AbsStart']:mEtr['AbsStart']+mEtr['len']]))
blk = {'Type':uint32_to_id(mBlk['id']),'entries':etrList}
blkList.append(blk)
return blkList
def bmms_bin_entry_genericparser(pcDataBuf):
st_ReadedBytes = 0
# Read the Entry ID
uiID = struct.unpack('!I', pcDataBuf[:4])[0]
st_ReadedBytes += 4
# Read length and type
uiLengthAndType = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
st_ReadedBytes += 4
# Save entry size and type
uiSize = uiLengthAndType & 0x00FFFFFF
uiType = uiLengthAndType & 0xFF000000
mEntry = {'Type':0,'Val':0}
mEntry['Type'] = uint32_to_id(uiID)
mList = []
match uiType:
case 0x01000000 | 0x02000000 | 0x03000000 | 0xa1000000 | 0xa2000000 | 0xa3000000 :
if (uiSize - st_ReadedBytes) % 4 != 0:
mEntry['Val'] = []
return mEntry
nIntValues = int((uiSize - st_ReadedBytes) / 4)
for i in range(nIntValues):
mList.append(read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4]))
st_ReadedBytes += 4
mEntry['Val'] = mList
case 0x05000000:
mEntry['Val'] = pcDataBuf[st_ReadedBytes:st_ReadedBytes + uiSize-8].rstrip(b'\0').decode('utf-8')
case other:
return mEntry
return mEntry
def bmms_bin_indexer(pcDataBuf):
BlkList = []
BlkList = bmms_index_bin_block_pos(pcDataBuf)
for mBlk in BlkList:
mBlk['Entries'] = bmms_index_bin_entry_pos(pcDataBuf[mBlk['start']: mBlk['start'] + mBlk['len'] ])
for mEtr in mBlk['Entries']:
mEtr['AbsStart'] = mBlk['start'] + mEtr['start']
return BlkList

0
binReader/__init__.py Normal file
View File

110
binReader/bmmsLogReader.py Normal file
View File

@ -0,0 +1,110 @@
# gfattori - 05.07.2024
# dumb reader of logfiles
# Porting of bmms_mLogReader used in the gating system (https://git.psi.ch/cpt_bioeng/ots/)
import os
import mmap
class BMMSLogReader:
def __init__(self):
self.eReaderStatus = 'logFileClosed'
self.iFd = None
self.pcFName = None
self.s_file_info = None
self.pcMemBlock = None
def bmms_bin_log_load(psBMMSLOGREADER, pcc_fName):
# Initialize variables
iRes = 0
iFileDescriptor = 0
st_fnamelen = 0
st_FileLength = 0
# Check if a log is already loaded and unload it if necessary
if psBMMSLOGREADER.eReaderStatus != 'logFileClosed':
bmms_bin_log_unload(psBMMSLOGREADER)
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
# Check if the file exists
if not os.path.exists(pcc_fName):
print(f"{fNamePrefix} File not found")
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
try:
psBMMSLOGREADER.iFd = open(pcc_fName, "a+b")
except IOError:
print(f"{fNamePrefix} File error")
return -1
# Copy the filename
st_fnamelen = len(pcc_fName)
psBMMSLOGREADER.pcFName = pcc_fName
# Get the file descriptor
iFileDescriptor = psBMMSLOGREADER.iFd.fileno()
# Get information about the file
try:
psBMMSLOGREADER.s_file_info = os.fstat(iFileDescriptor)
except OSError:
print(f"{fNamePrefix} File error: fStat")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
print(f"{fNamePrefix} Size: {psBMMSLOGREADER.s_file_info.st_size}")
st_FileLength = psBMMSLOGREADER.s_file_info.st_size
if st_FileLength == 0:
print(f"{fNamePrefix} file size 0: fStat")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
# Make sure the file is an ordinary file
if not os.path.isfile(pcc_fName):
print(f"{fNamePrefix} File error: !S_ISREG.")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
return -1
# Map the file into memory
try:
#psBMMSLOGREADER.pcMemBlock = mmap.mmap(iFileDescriptor, 0, mmap.MAP_SHARED, mmap.PROT_READ)
psBMMSLOGREADER.pcMemBlock = mmap.mmap(iFileDescriptor, psBMMSLOGREADER.s_file_info.st_size, mmap.MAP_SHARED, mmap.PROT_READ)
except Exception as e:
print(f"{fNamePrefix} File error: MAP_FAILED")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcMemBlock = None
psBMMSLOGREADER.pcFName = None
return -1
print(f"{fNamePrefix} {pcc_fName} loaded.")
psBMMSLOGREADER.eReaderStatus = 'logFileOpened'
# Call BMMSmLogCheckBinLogIntegrity
iRes = bmms_m_log_check_bin_log_integrity(psBMMSLOGREADER)
print(f"{fNamePrefix} log file loaded! ;)")
return iRes
def bmms_bin_log_unload(psBMMSLOGREADER):
# Unmap memory and close the file descriptor
if psBMMSLOGREADER.pcMemBlock:
psBMMSLOGREADER.pcMemBlock.close()
if psBMMSLOGREADER.iFd:
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcMemBlock = None
psBMMSLOGREADER.iFd = None
psBMMSLOGREADER.pcFName = None
def bmms_m_log_check_bin_log_integrity(psBMMSLOGREADER):
# Placeholder for the actual integrity check logic
return 0
# Constants
fNamePrefix = "LogReader"