This repository has been archived on 2025-04-15. You can view files and clone it, but cannot push or open issues or pull requests.
2023-06-05 17:40:58 +02:00

188 lines
6.1 KiB
Python

import json
from pathlib import Path
import numpy as np
import re
from . import RawFileReader
from .enums import DetectorType
class RawFile:
"""
Generic Raw File reader. Picks up settings from .json master file
Currently supports: Moench03 =)
"""
def __init__(self, fname, header = False):
self.findex = 0
self.fname = fname
self.json = False #Master file data comes from json
fname = Path(fname)
if fname.suffix == '.json':
with open(fname) as f:
self.master = json.load(f)
self.json = True
elif fname.suffix == '.raw':
with open(fname) as f:
self._load_raw_master(f)
else:
raise ValueError(f'{fname.suffix} not supported')
#Figure out which file to open
if self.master['Detector Type'] == 'Moench' and self.master['Analog Samples'] == 5000:
#TODO! pass settings to reader
self._parse_fname()
self.reader = RawFileReader(self.data_fname(0,0), DetectorType.MOENCH_03, header = header)
elif self.master['Detector Type'] == 'ChipTestBoard' and self.master['Analog Samples'] == 5000:
self._parse_fname()
#Do we read analog or analog+digital
if self.master['Digital Flag'] == 1:
dt = DetectorType.MOENCH_04_AD
else:
dt = DetectorType.MOENCH_04_AD
self.reader = RawFileReader(self.data_fname(0,0), dt, header = header)
else:
raise ValueError('unsupported file')
def _parse_fname(self):
try:
base, _, run_id = self.fname.stem.rsplit("_", 2)
self.base = self.fname.parent / base
self.run_id = int(run_id)
except:
raise ValueError(f"Could not parse master file name: {self.fname}")
def _load_raw_master(self, f):
self.master = {}
lines = f.readlines()
it = iter(lines)
for line in it:
if line.startswith("#Frame"):
break
if line == "\n":
continue
if line.startswith("Scan Parameters"):
while not line.endswith("]\n"):
line += next(it)
field, value = line.split(":", 1)
self.master[field.strip(" ")] = value.strip(" \n")
frame_header = {}
for line in it:
field, value = line.split(":", 1)
frame_header[field.strip()] = value.strip(" \n")
self.master["Frame Header"] = frame_header
self.master["Version"] = float(self.master["Version"])
self._parse_values()
def _parse_values(self):
int_fields = set(
(
"Analog Samples",
"Analog Flag",
"Digital Flag",
"Digital Samples",
"Max Frames Per File",
"Image Size",
"Frame Padding",
"Total Frames",
"Dynamic Range",
"Ten Giga",
"Quad",
"Number of Lines read out",
"Number of UDP Interfaces"
)
)
time_fields = set((
"Exptime",
"Exptime1",
"Exptime2",
"Exptime3",
"GateDelay1",
"GateDelay2",
"GateDelay3",
"SubExptime",#Eiger
"SubPeriod", #Eiger
"Period"
))
#some fields might not exist for all detectors
#hence using intersection
for field in time_fields.intersection(self.master.keys()):
self.master[field] = self.to_nanoseconds(self.master[field])
#Parse bothx .json and .raw master files
if self.json:
self.master['Image Size'] = self.master["Image Size in bytes"]
self.master['Pixels'] = (self.master['Pixels']['x'], self.master['Pixels']['y'])
self.master['nmod'] = int(self.master['Geometry']['x']*self.master['Geometry']['y'] )#ports not modules
if self.master['Detector Type'] == 'Eiger':
self.master['nmod'] = self.master['nmod'] // 2
else:
for field in int_fields.intersection(self.master.keys()):
self.master[field] = int(self.master[field].split()[0])
self.master["Pixels"] = tuple(
int(i) for i in self.master["Pixels"].strip("[]").split(",")
)
if "Rate Corrections" in self.master:
self.master["Rate Corrections"] = (
self.master["Rate Corrections"].strip("[]").split(",")
)
n = len(self.master["Rate Corrections"])
assert (
self.master["nmod"] == n
), f'nmod from Rate Corrections {n} differs from nmod {self.master["nmod"]}'
#Parse threshold for Mythen3 (if needed)
if "Threshold Energies" in self.master.keys():
th = self.master["Threshold Energies"]
if isinstance(th, str):
th = [int(i) for i in th.strip('[]').split(',')]
self.master["Threshold Energies"] = th
@staticmethod
def to_nanoseconds(t):
nanoseconds = {"s": 1000 * 1000 * 1000, "ms": 1000 * 1000, "us": 1000, "ns": 1}
try:
value = re.match(r"(\d+(?:\.\d+)?)", t).group()
unit = t[len(value) :]
value = int(float(value) * nanoseconds[unit])
value = np.timedelta64(value, 'ns')
except:
raise ValueError(f"Could not convert: {t} to nanoseconds")
return value
def data_fname(self, i, findex=0):
return Path(f"{self.base}_d{i}_f{findex}_{self.run_id}.raw")
def read(self, *args):
return self.reader.read(*args)
# Support iteration
def __iter__(self):
return self
def __next__(self):
res = self.reader.read()
if res.shape[0] == 0:
raise StopIteration
# Support with statement
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
pass