Files
binlogreader/binReader/LogReaderHelpers.py
2024-09-10 09:21:26 +00:00

228 lines
6.9 KiB
Python

# gfattori - 05.07.2024
# dumb reader of logfiles
# Various helpers, largely inspired from log reader used in the gating system (https://git.psi.ch/cpt_bioeng/ots/)
import struct
def id_to_uint32(pcID):
# Initialize a 32-bit value to zero
val = 0
# Calculate the length of the input string
n = len(pcID)
# Consider only the first 4 characters in the input string
if n > 4:
n = 4
# Copy the first n characters to the output 32-bit integer
val_bytes = pcID[:n].encode('utf-8')
val = struct.unpack('!I', val_bytes.ljust(4, b'\0'))[0]
return val
def uint32_to_id(val):
# Pack the 32-bit integer into bytes
val_bytes = struct.pack('!I', val)
# Decode the bytes to a string
pcID = val_bytes.rstrip(b'\0').decode('utf-8')
return pcID
def read_nbo(pCh):
return struct.unpack('!I', pCh[:4])[0]
def read_string(data, start_index):
# Helper function to read an unsigned 32-bit integer from the data (big-endian)
def read_uint32(data, index):
return int.from_bytes(data[index:index+4], byteorder='big')
# Determine the string length
string_length = read_uint32(data, start_index)
start_index += 4
# Read the string based on the determined length
result = data[start_index:start_index + string_length].rstrip(b'\0').decode('utf-8')
start_index += string_length
# Calculate the allocated space for the string
string_space_allocation = 4 * (1 + ((string_length - 1) // 4))
bytes_read = 4 + string_space_allocation
return bytes_read, result
def bmms_index_bin_block_pos(pcDataBuf):
GenBlockList = []
iMemRunner = 0
while iMemRunner < pcDataBuf.size():
blk = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
'start': iMemRunner,
'len': read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
}
GenBlockList.append(blk)
iMemRunner += blk['len']
return GenBlockList
def bmms_index_bin_entry_pos(pcDataBuf):
GenEntryList = []
iMemRunner = 16
while iMemRunner < len(pcDataBuf)-4:
lenAndTyp = read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
etr = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
'start': iMemRunner,
'len': (lenAndTyp & 0x00FFFFFF),
'typ': hex(lenAndTyp & 0xFF000000)
}
GenEntryList.append(etr)
iMemRunner += etr['len']
return GenEntryList
def bmms_readData_bin_blockList(pcDataBuf, blkIdx):
blkList = []
for mBlk in blkIdx:
etrList = []
for mEtr in mBlk['Entries']:
etrList.append(bmms_bin_entry_genericparser(pcDataBuf[mEtr['AbsStart']:mEtr['AbsStart']+mEtr['len']]))
blk = {'Type':uint32_to_id(mBlk['id']),'entries':etrList}
blkList.append(blk)
return blkList
def bmms_bin_entry_genericparser(pcDataBuf):
st_ReadedBytes = 0
# Read the Entry ID
uiID = struct.unpack('!I', pcDataBuf[:4])[0]
st_ReadedBytes += 4
# Read length and type
uiLengthAndType = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
st_ReadedBytes += 4
# Save entry size and type
uiSize = uiLengthAndType & 0x00FFFFFF
uiType = uiLengthAndType & 0xFF000000
mEntry = {'Type':0,'Val':0}
mEntry['Type'] = uint32_to_id(uiID)
match uiType:
case 0x01000000 | 0xa1000000 :
# int32 and int32 array
if (uiSize - st_ReadedBytes) % 4 != 0:
mEntry['Val'] = []
return mEntry
nIntValues = int((uiSize - st_ReadedBytes) / 4)
mList = []
for i in range(nIntValues):
mList.append(struct.unpack('!i', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4][:4])[0])
st_ReadedBytes += 4
mEntry['Val'] = mList
case 0x02000000 | 0xa2000000 :
# uint32 and uint32 array
if (uiSize - st_ReadedBytes) % 4 != 0:
mEntry['Val'] = []
return mEntry
nIntValues = int((uiSize - st_ReadedBytes) / 4)
mList = []
for i in range(nIntValues):
mList.append(read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4]))
st_ReadedBytes += 4
mEntry['Val'] = mList
case 0x03000000 |0xa3000000 :
# float32 and float32 array
if (uiSize - st_ReadedBytes) % 4 != 0:
mEntry['Val'] = []
return mEntry
nIntValues = int((uiSize - st_ReadedBytes) / 4)
mList = []
for i in range(nIntValues):
mList.append(struct.unpack('!f', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4][:4])[0])
st_ReadedBytes += 4
mEntry['Val'] = mList
case 0x04000000 | 0xa4000000:
# double64 and double64 array
if (uiSize - st_ReadedBytes) % 8 != 0:
mEntry['Val'] = []
return mEntry
nIntValues = int((uiSize - st_ReadedBytes) / 8)
mList = []
for i in range(nIntValues):
mList.append(struct.unpack('!d', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 8][:8])[0])
st_ReadedBytes += 8
mEntry['Val'] = mList
case 0x05000000 :
# string
mEntry['Val'] = pcDataBuf[st_ReadedBytes:st_ReadedBytes + uiSize-8].rstrip(b'\0').decode('utf-8')
case 0xa5000000 :
# string array
nStrings = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
st_ReadedBytes+=4
mList = []
for _ in range (nStrings):
bytes_read, sString = read_string(pcDataBuf,st_ReadedBytes)
mList.append(sString)
st_ReadedBytes += bytes_read
mEntry['Val'] = mList
case 0xc5000000 :
# string-string map
nStrings = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
st_ReadedBytes+=4
mList = []
for _ in range (int(nStrings/2)):
bytes_read, sString_1 = read_string(pcDataBuf,st_ReadedBytes)
st_ReadedBytes += bytes_read
bytes_read, sString_2 = read_string(pcDataBuf,st_ReadedBytes)
st_ReadedBytes += bytes_read
mList.append(tuple([sString_1,sString_2]))
mEntry['Val'] = mList
case other:
return mEntry
return mEntry
def bmms_bin_indexer(pcDataBuf):
BlkList = []
BlkList = bmms_index_bin_block_pos(pcDataBuf)
for mBlk in BlkList:
mBlk['Entries'] = bmms_index_bin_entry_pos(pcDataBuf[mBlk['start']: mBlk['start'] + mBlk['len'] ])
for mEtr in mBlk['Entries']:
mEtr['AbsStart'] = mBlk['start'] + mEtr['start']
return BlkList