228 lines
6.9 KiB
Python
228 lines
6.9 KiB
Python
# gfattori - 05.07.2024
|
|
# dumb reader of logfiles
|
|
# Various helpers, largely inspired from log reader used in the gating system (https://git.psi.ch/cpt_bioeng/ots/)
|
|
|
|
|
|
import struct
|
|
|
|
def id_to_uint32(pcID):
|
|
# Initialize a 32-bit value to zero
|
|
val = 0
|
|
|
|
# Calculate the length of the input string
|
|
n = len(pcID)
|
|
|
|
# Consider only the first 4 characters in the input string
|
|
if n > 4:
|
|
n = 4
|
|
|
|
# Copy the first n characters to the output 32-bit integer
|
|
val_bytes = pcID[:n].encode('utf-8')
|
|
val = struct.unpack('!I', val_bytes.ljust(4, b'\0'))[0]
|
|
|
|
return val
|
|
|
|
def uint32_to_id(val):
|
|
# Pack the 32-bit integer into bytes
|
|
val_bytes = struct.pack('!I', val)
|
|
|
|
# Decode the bytes to a string
|
|
pcID = val_bytes.rstrip(b'\0').decode('utf-8')
|
|
|
|
return pcID
|
|
|
|
def read_nbo(pCh):
|
|
return struct.unpack('!I', pCh[:4])[0]
|
|
|
|
def read_string(data, start_index):
|
|
# Helper function to read an unsigned 32-bit integer from the data (big-endian)
|
|
def read_uint32(data, index):
|
|
return int.from_bytes(data[index:index+4], byteorder='big')
|
|
|
|
# Determine the string length
|
|
string_length = read_uint32(data, start_index)
|
|
start_index += 4
|
|
|
|
# Read the string based on the determined length
|
|
result = data[start_index:start_index + string_length].rstrip(b'\0').decode('utf-8')
|
|
start_index += string_length
|
|
|
|
# Calculate the allocated space for the string
|
|
string_space_allocation = 4 * (1 + ((string_length - 1) // 4))
|
|
bytes_read = 4 + string_space_allocation
|
|
|
|
return bytes_read, result
|
|
|
|
def bmms_index_bin_block_pos(pcDataBuf):
|
|
|
|
GenBlockList = []
|
|
iMemRunner = 0
|
|
|
|
while iMemRunner < pcDataBuf.size():
|
|
|
|
blk = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
|
|
'start': iMemRunner,
|
|
'len': read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
|
|
}
|
|
GenBlockList.append(blk)
|
|
iMemRunner += blk['len']
|
|
|
|
return GenBlockList
|
|
|
|
def bmms_index_bin_entry_pos(pcDataBuf):
|
|
|
|
GenEntryList = []
|
|
iMemRunner = 16
|
|
|
|
while iMemRunner < len(pcDataBuf)-4:
|
|
lenAndTyp = read_nbo(pcDataBuf[iMemRunner+4:iMemRunner+8])
|
|
etr = {'id': read_nbo(pcDataBuf[iMemRunner:iMemRunner+4]),
|
|
'start': iMemRunner,
|
|
'len': (lenAndTyp & 0x00FFFFFF),
|
|
'typ': hex(lenAndTyp & 0xFF000000)
|
|
}
|
|
GenEntryList.append(etr)
|
|
iMemRunner += etr['len']
|
|
|
|
return GenEntryList
|
|
|
|
def bmms_readData_bin_blockList(pcDataBuf, blkIdx):
|
|
|
|
blkList = []
|
|
for mBlk in blkIdx:
|
|
etrList = []
|
|
for mEtr in mBlk['Entries']:
|
|
etrList.append(bmms_bin_entry_genericparser(pcDataBuf[mEtr['AbsStart']:mEtr['AbsStart']+mEtr['len']]))
|
|
|
|
blk = {'Type':uint32_to_id(mBlk['id']),'entries':etrList}
|
|
blkList.append(blk)
|
|
|
|
return blkList
|
|
|
|
def bmms_bin_entry_genericparser(pcDataBuf):
|
|
|
|
st_ReadedBytes = 0
|
|
# Read the Entry ID
|
|
uiID = struct.unpack('!I', pcDataBuf[:4])[0]
|
|
st_ReadedBytes += 4
|
|
|
|
# Read length and type
|
|
uiLengthAndType = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
|
|
st_ReadedBytes += 4
|
|
# Save entry size and type
|
|
uiSize = uiLengthAndType & 0x00FFFFFF
|
|
uiType = uiLengthAndType & 0xFF000000
|
|
|
|
mEntry = {'Type':0,'Val':0}
|
|
mEntry['Type'] = uint32_to_id(uiID)
|
|
|
|
match uiType:
|
|
|
|
case 0x01000000 | 0xa1000000 :
|
|
# int32 and int32 array
|
|
|
|
if (uiSize - st_ReadedBytes) % 4 != 0:
|
|
mEntry['Val'] = []
|
|
return mEntry
|
|
|
|
nIntValues = int((uiSize - st_ReadedBytes) / 4)
|
|
|
|
mList = []
|
|
for i in range(nIntValues):
|
|
mList.append(struct.unpack('!i', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4][:4])[0])
|
|
st_ReadedBytes += 4
|
|
|
|
mEntry['Val'] = mList
|
|
|
|
case 0x02000000 | 0xa2000000 :
|
|
# uint32 and uint32 array
|
|
|
|
if (uiSize - st_ReadedBytes) % 4 != 0:
|
|
mEntry['Val'] = []
|
|
return mEntry
|
|
|
|
nIntValues = int((uiSize - st_ReadedBytes) / 4)
|
|
|
|
mList = []
|
|
for i in range(nIntValues):
|
|
mList.append(read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4]))
|
|
st_ReadedBytes += 4
|
|
|
|
mEntry['Val'] = mList
|
|
|
|
case 0x03000000 |0xa3000000 :
|
|
# float32 and float32 array
|
|
|
|
if (uiSize - st_ReadedBytes) % 4 != 0:
|
|
mEntry['Val'] = []
|
|
return mEntry
|
|
|
|
nIntValues = int((uiSize - st_ReadedBytes) / 4)
|
|
|
|
mList = []
|
|
for i in range(nIntValues):
|
|
mList.append(struct.unpack('!f', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4][:4])[0])
|
|
st_ReadedBytes += 4
|
|
|
|
mEntry['Val'] = mList
|
|
|
|
case 0x04000000 | 0xa4000000:
|
|
# double64 and double64 array
|
|
|
|
if (uiSize - st_ReadedBytes) % 8 != 0:
|
|
mEntry['Val'] = []
|
|
return mEntry
|
|
|
|
nIntValues = int((uiSize - st_ReadedBytes) / 8)
|
|
|
|
mList = []
|
|
for i in range(nIntValues):
|
|
mList.append(struct.unpack('!d', pcDataBuf[st_ReadedBytes:st_ReadedBytes + 8][:8])[0])
|
|
st_ReadedBytes += 8
|
|
|
|
mEntry['Val'] = mList
|
|
|
|
case 0x05000000 :
|
|
# string
|
|
mEntry['Val'] = pcDataBuf[st_ReadedBytes:st_ReadedBytes + uiSize-8].rstrip(b'\0').decode('utf-8')
|
|
|
|
case 0xa5000000 :
|
|
# string array
|
|
nStrings = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
|
|
st_ReadedBytes+=4
|
|
mList = []
|
|
for _ in range (nStrings):
|
|
bytes_read, sString = read_string(pcDataBuf,st_ReadedBytes)
|
|
mList.append(sString)
|
|
st_ReadedBytes += bytes_read
|
|
|
|
mEntry['Val'] = mList
|
|
|
|
|
|
case 0xc5000000 :
|
|
# string-string map
|
|
nStrings = read_nbo(pcDataBuf[st_ReadedBytes:st_ReadedBytes + 4])
|
|
st_ReadedBytes+=4
|
|
mList = []
|
|
for _ in range (int(nStrings/2)):
|
|
bytes_read, sString_1 = read_string(pcDataBuf,st_ReadedBytes)
|
|
st_ReadedBytes += bytes_read
|
|
bytes_read, sString_2 = read_string(pcDataBuf,st_ReadedBytes)
|
|
st_ReadedBytes += bytes_read
|
|
mList.append(tuple([sString_1,sString_2]))
|
|
mEntry['Val'] = mList
|
|
|
|
case other:
|
|
return mEntry
|
|
|
|
return mEntry
|
|
|
|
def bmms_bin_indexer(pcDataBuf):
|
|
BlkList = []
|
|
BlkList = bmms_index_bin_block_pos(pcDataBuf)
|
|
for mBlk in BlkList:
|
|
mBlk['Entries'] = bmms_index_bin_entry_pos(pcDataBuf[mBlk['start']: mBlk['start'] + mBlk['len'] ])
|
|
for mEtr in mBlk['Entries']:
|
|
mEtr['AbsStart'] = mBlk['start'] + mEtr['start']
|
|
|
|
return BlkList |