Files
binlogreader/binReader/bmmsLogReader.py
2024-07-05 14:54:34 +02:00

111 lines
3.4 KiB
Python

# gfattori - 05.07.2024
# dumb reader of logfiles
# Porting of bmms_mLogReader used in the gating system (https://git.psi.ch/cpt_bioeng/ots/)
import os
import mmap
class BMMSLogReader:
def __init__(self):
self.eReaderStatus = 'logFileClosed'
self.iFd = None
self.pcFName = None
self.s_file_info = None
self.pcMemBlock = None
def bmms_bin_log_load(psBMMSLOGREADER, pcc_fName):
# Initialize variables
iRes = 0
iFileDescriptor = 0
st_fnamelen = 0
st_FileLength = 0
# Check if a log is already loaded and unload it if necessary
if psBMMSLOGREADER.eReaderStatus != 'logFileClosed':
bmms_bin_log_unload(psBMMSLOGREADER)
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
# Check if the file exists
if not os.path.exists(pcc_fName):
print(f"{fNamePrefix} File not found")
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
try:
psBMMSLOGREADER.iFd = open(pcc_fName, "a+b")
except IOError:
print(f"{fNamePrefix} File error")
return -1
# Copy the filename
st_fnamelen = len(pcc_fName)
psBMMSLOGREADER.pcFName = pcc_fName
# Get the file descriptor
iFileDescriptor = psBMMSLOGREADER.iFd.fileno()
# Get information about the file
try:
psBMMSLOGREADER.s_file_info = os.fstat(iFileDescriptor)
except OSError:
print(f"{fNamePrefix} File error: fStat")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
print(f"{fNamePrefix} Size: {psBMMSLOGREADER.s_file_info.st_size}")
st_FileLength = psBMMSLOGREADER.s_file_info.st_size
if st_FileLength == 0:
print(f"{fNamePrefix} file size 0: fStat")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
psBMMSLOGREADER.eReaderStatus = 'logFileClosed'
return -1
# Make sure the file is an ordinary file
if not os.path.isfile(pcc_fName):
print(f"{fNamePrefix} File error: !S_ISREG.")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcFName = None
return -1
# Map the file into memory
try:
#psBMMSLOGREADER.pcMemBlock = mmap.mmap(iFileDescriptor, 0, mmap.MAP_SHARED, mmap.PROT_READ)
psBMMSLOGREADER.pcMemBlock = mmap.mmap(iFileDescriptor, psBMMSLOGREADER.s_file_info.st_size, mmap.MAP_SHARED, mmap.PROT_READ)
except Exception as e:
print(f"{fNamePrefix} File error: MAP_FAILED")
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcMemBlock = None
psBMMSLOGREADER.pcFName = None
return -1
print(f"{fNamePrefix} {pcc_fName} loaded.")
psBMMSLOGREADER.eReaderStatus = 'logFileOpened'
# Call BMMSmLogCheckBinLogIntegrity
iRes = bmms_m_log_check_bin_log_integrity(psBMMSLOGREADER)
print(f"{fNamePrefix} log file loaded! ;)")
return iRes
def bmms_bin_log_unload(psBMMSLOGREADER):
# Unmap memory and close the file descriptor
if psBMMSLOGREADER.pcMemBlock:
psBMMSLOGREADER.pcMemBlock.close()
if psBMMSLOGREADER.iFd:
psBMMSLOGREADER.iFd.close()
psBMMSLOGREADER.pcMemBlock = None
psBMMSLOGREADER.iFd = None
psBMMSLOGREADER.pcFName = None
def bmms_m_log_check_bin_log_integrity(psBMMSLOGREADER):
# Placeholder for the actual integrity check logic
return 0
# Constants
fNamePrefix = "LogReader"