Files
eos/eos/file_reader.py
T
glavic_a 6ccf7e8028
Unit Testing / test (3.10) (push) Successful in 50s
Unit Testing / test (3.11) (push) Successful in 49s
Unit Testing / test (3.12) (push) Successful in 51s
Unit Testing / test (3.8) (push) Successful in 47s
Unit Testing / test (3.9) (push) Successful in 49s
Update start_time_fallback location
2026-06-01 12:46:23 +02:00

614 lines
27 KiB
Python

"""
Reading of Amor NeXus data files to extract metadata and event stream.
"""
from typing import BinaryIO, List, Union
import sys
import h5py
import numpy as np
import logging
import yaml
from datetime import datetime
from orsopy import fileio
from orsopy.fileio.model_language import SampleModel
from . import const, compat
from .header import Header
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, LOG_TYPE, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, \
PC_TYPE, BM_TYPE, DA00EventBuffer
try:
import zoneinfo
except ImportError:
# for python versions < 3.9 try to use the backports version
from backports import zoneinfo
# Time zone used to interpret time strings
AMOR_LOCAL_TIMEZONE = zoneinfo.ZoneInfo(key='Europe/Zurich')
UTC = zoneinfo.ZoneInfo(key='UTC')
NO_DEFAULT_VALUE = object() # just for allowing None default value in AmorHeader.rv
class AmorHeader:
"""
Collects header information from Amor NeXus file without reading event data.
"""
# mapping of names to (hdf_path, dtype, nicos_key[, suffix])
hdf_paths = dict(
title=('entry1/title', str),
proposal_id=('entry1/proposal_id', str),
user_name=('entry1/user/name', str),
user_email=('entry1/user/email', str),
user_affiliation=('entry1/user/affiliation', str),
sample_name=('entry1/sample/name', str),
sample_model=('entry1/sample/model', str),
sample_geometry=('entry1/sample/geometry', str),
source_name=('entry1/Amor/source/name', str),
start_time=('entry1/start_time', str),
start_time_fallback=('entry1/measurement_configuration/start_time', str),
end_time=('entry1/end_time', str),
chopper_separation=('entry1/Amor/chopper/pair_separation', float),
chopper_distance=('entry1/Amor/chopper/distance', float),
detector_distance=('entry1/Amor/detector_stage/transformation/distance', float),
sample_temperature=('entry1/sample/environment/temperature', float),
sample_magnetic_field=('entry1/sample/environment/magnetic_field', float),
sample_current=('entry1/sample/environment/current', float),
sample_voltage=('entry1/sample/environment/voltage', float),
sample_gas_pressure=('entry1/sample/environment/gas_pressure', float),
mu=('entry1/measurement_configuration/mu', float, 'mu'),
nu=('entry1/measurement_configuration/nu', float, 'nu'),
kap=('entry1/measurement_configuration/kappa', float, 'kappa'),
kad=('entry1/measurement_configuration/kappa_offset', float, 'kappa_offset'),
div=('entry1/measurement_configuration/div', float, 'div'),
virtual_source_horizontal=('entry1/Amor/virtual_source/horizontal', float),
virtual_source_vertical=('entry1/Amor/virtual_source/vertical', float),
ch1_trigger_phase=('entry1/Amor/chopper/ch1_trigger_phase', float, 'ch1_trigger_phase'),
ch2_trigger_phase=('entry1/Amor/chopper/ch2_trigger_phase', float, 'ch2_trigger_phase'),
chopper_speed=('entry1/Amor/chopper/rotation_speed', float, 'chopper_speed'),
chopper_phase=('entry1/Amor/chopper/phase', float, 'chopper_phase'),
polarization_config_label=('entry1/Amor/polarization/configuration', int, 'polarization_config_label', '/*'),
data=('entry1/detector/data', None), # data group used to load events from
trigger=('entry1/detector/trigger', float),
beam_monitor=('entry1/beam_monitor', None), # special data group (da00 flat-buffer scheme)
proton_current=('entry1/detector/proton_current', float),
acquisition_filter=('entry1/detector/acquisition_filter', int),
)
hdf: h5py.File
# environment parameters from above used for sample header (sample_{param} key)
SEE_PARAMS = ['temperature', 'magnetic_field', 'current', 'voltage', 'gas_pressure']
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
if type(fileName) is str:
logging.warning(f' {fileName.split("/")[-1]}')
self.hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:
self.hdf = fileName
else:
self.hdf = h5py.File(fileName, 'r')
self._log_keys = []
self.read_header_info()
self.read_instrument_configuration()
if type(fileName) is str:
# close the input file to free memory, only if the file was opened in this object
self.hdf.close()
del(self.hdf)
def _replace_if_missing(self, key, nicos_key, dtype=float, suffix=''):
from .nicos_interface import lookup_nicos_value
year = self.fileDate.strftime('%Y')
return lookup_nicos_value(key, nicos_key, dtype, suffix, year)
def rv(self, key, with_unit=False, default=NO_DEFAULT_VALUE):
"""
Generic read value methos based on key in hdf_paths dictionary.
If with_unit is True, return tuple of value, unit.
"""
# TODO: refactor method for better readibility/reliability
try:
hdf_path, dtype, *nicos = self.hdf_paths[key]
except KeyError:
if default is not NO_DEFAULT_VALUE:
logging.debug(f' Key {key} not present in file version, using default value')
# if default is given, ignore missing hdf_path for backward compatibility
if with_unit:
return default, ''
else:
return default
else:
raise
try:
hdfgrp = self.hdf[hdf_path]
if hdfgrp.attrs.get('NX_class', None) == 'NXlog':
# use the last value that was recoreded before the count started
time_column = hdfgrp['time'][:]
try:
start_index = np.where(time_column<=self._start_time_ns)[0][0]
except IndexError:
start_index = 0
if hdfgrp['value'].ndim==1:
output = dtype(hdfgrp['value'][start_index])
else:
output = dtype(hdfgrp['value'][start_index, 0])
# make sure key is only appended if no exception was raised
self._log_keys.append(key)
if with_unit:
unit = hdfgrp['value'].attrs.get('units', '')
return output, unit
else:
return output
elif hdfgrp.attrs.get('writer_module', None) == 'da00':
# da00 float-buffer stream data
return DA00EventBuffer(signal=hdfgrp['signal'][:], time=hdfgrp['time'][:])
elif dtype is str:
return self.read_string(hdf_path)
else:
if len(hdfgrp.shape)==1:
output = dtype(hdfgrp[0])
else:
output = dtype(hdfgrp[()])
if with_unit:
unit = hdfgrp.attrs.get('units', '')
return output, unit
else:
return output
except (KeyError, IndexError):
if nicos:
nicos_key = nicos[0]
suffix = nicos[1] if len(nicos)>1 else ''
output = self._replace_if_missing(key, nicos_key, dtype, suffix)
if with_unit:
return output, ''
else:
return output
elif default is not NO_DEFAULT_VALUE:
logging.debug(f' Using default value for {key}')
if with_unit:
return default, ''
else:
return default
else:
raise
def read_string(self, path):
if not np.shape(self.hdf[path]):
return self.hdf[path][()].decode('utf-8')
else:
# format until 2025
return self.hdf[path][0].decode('utf-8')
def read_header_info(self):
self._start_time_ns = np.uint64(0)
try:
start_time = self.rv('start_time')
except KeyError:
start_time = self.rv('start_time_fallback')
end_time = self.rv('end_time', default=None)
# extract start time as unix time, adding UTC offset of 1h to time string
if start_time.endswith('Z') and sys.version_info.minor<11:
# older python versions did not support Z format
start_time = start_time[:-1]
TZ = UTC
if end_time:
end_time = end_time[:-1]
else:
TZ = AMOR_LOCAL_TIMEZONE
start_date = datetime.fromisoformat(start_time)
end_date = datetime.fromisoformat(end_time) if end_time else None
self.fileDate = start_date.replace(tzinfo=TZ)
start_timestamp = self.fileDate.timestamp()
if start_timestamp<=compat.legacy_cutoffs[-1]:
for cutoff in compat.legacy_cutoffs:
if start_timestamp<=cutoff:
self.hdf_paths = compat.legacy_hdf_paths[cutoff]
break
cutoff_date = datetime.fromtimestamp(cutoff)
logging.info(f" Detected legacy file format, using older mapping from before {cutoff_date.strftime('%Y-%m-%d')}")
self._start_time_ns = np.uint64(start_timestamp*1e9)
# read general information and first data set
title = self.rv('title')
proposal_id = self.rv('proposal_id')
user_name = self.rv('user_name')
user_affiliation = self.rv('user_affiliation', default='unknown')
user_email = self.rv('user_email')
user_orcid = None
sampleName = self.rv('sample_name')
source = self.rv('source_name')
model = self.rv('sample_model')
if 'stack' in model:
model = yaml.safe_load(model)
else:
model = dict(stack=model)
self.owner = fileio.Person(
name=user_name,
affiliation=user_affiliation,
contact=user_email,
)
if user_orcid:
self.owner.orcid = user_orcid
self.experiment = fileio.Experiment(
title=title,
instrument='Amor',
start_date=start_date,
probe='neutron',
facility=source,
proposalID=proposal_id
)
if end_date:
self.experiment.end_date = end_date
if model['stack'] == '':
om = None
else:
om = SampleModel.from_dict(model)
self.sample = fileio.Sample(
name=sampleName,
model=om,
sample_parameters={},
)
sample_geometry = self.rv('sample_geometry', default=None)
if sample_geometry:
self.sample.geometry = yaml.safe_load(sample_geometry)
# while event times are not evaluated, use first value reported in file for SEE
for param in self.SEE_PARAMS:
key = f'sample_{param}'
value, unit = self.rv(key, with_unit=True, default=None)
if value is not None:
self.sample.sample_parameters[param] = fileio.Value(value, unit=unit)
def read_instrument_configuration(self):
chopperSeparation = self.rv('chopper_separation')
detectorDistance = self.rv('detector_distance')
chopperDistance = self.rv('chopper_distance')
chopperDetectorDistance = detectorDistance - chopperDistance
mu = self.rv('mu')
nu = self.rv('nu')
kap = self.rv('kap')
kad = self.rv('kad')
div = self.rv('div')
VS_H = self.rv('virtual_source_horizontal', default=-1.)
VS_V = self.rv('virtual_source_vertical', default=-1.)
ch1TriggerPhase = self.rv('ch1_trigger_phase')
ch2TriggerPhase = self.rv('ch2_trigger_phase')
try:
trigger_group = self.hdf[self.hdf_paths['trigger'][0]]
chopperTriggerTime = (float(trigger_group['event_time_zero'][7])
-float(trigger_group['event_time_zero'][0])) \
/7
chopperTriggerTimeDiff = float(trigger_group['event_time_offset'][2])
except (KeyError, IndexError):
logging.debug(' chopper speed and phase taken from .hdf file')
chopperSpeed = self.rv('chopper_speed')
chopperPhase = self.rv('chopper_phase')
tau = 30/chopperSpeed
else:
tau = int(1e-6*chopperTriggerTime/2+0.5)*(1e-3)
chopperTriggerPhase = 180e-9*chopperTriggerTimeDiff/tau
chopperSpeed = 30/tau
chopperPhase = chopperTriggerPhase+ch1TriggerPhase-ch2TriggerPhase
self.geometry = AmorGeometry(mu, nu, kap, kad, div,
chopperSeparation, detectorDistance, chopperDetectorDistance)
self.timing = AmorTiming(ch1TriggerPhase, ch2TriggerPhase, chopperSpeed, chopperPhase, tau)
polarizationConfigLabel = self.rv('polarization_config_label')
polarizationConfig = fileio.Polarization(const.polarizationConfigs[polarizationConfigLabel])
logging.debug(f' polarization configuration: {polarizationConfig} (index {polarizationConfigLabel})')
self.instrument_settings = fileio.InstrumentSettings(
incident_angle = fileio.ValueRange(round(mu+kap+kad-0.5*div, 3),
round(mu+kap+kad+0.5*div, 3),
'deg'),
wavelength = fileio.ValueRange(const.lamdaCut, const.lamdaMax, 'angstrom'),
polarization = fileio.Polarization(polarizationConfig)
)
self.instrument_settings.qz = fileio.ValueRange(round(4*np.pi*np.sin(np.deg2rad(mu+kap+kad-0.5*div))/const.lamdaMax, 3),
round(4*np.pi*np.sin(np.deg2rad(mu+kap+kad+0.5*div))/const.lamdaCut, 3),
'1/angstrom')
self.instrument_settings.mu = fileio.Value(
round(mu, 3),
'deg',
comment='sample angle to horizon')
self.instrument_settings.nu = fileio.Value(
round(nu, 3),
'deg',
comment='detector angle to horizon')
self.instrument_settings.div = fileio.Value(
round(div, 3),
'deg',
comment='incoming beam divergence')
self.instrument_settings.kap = fileio.Value(
round(kap, 3),
'deg',
comment='incoming beam inclination')
if abs(kad)>0.02:
self.instrument_settings.kad = fileio.Value(
round(kad, 3),
'deg',
comment='incoming beam angular offset')
if VS_H>0 and VS_V>0:
self.instrument_settings.virtual_source_horizontal = fileio.Value(VS_H, 'mm')
self.instrument_settings.virtual_source_vertical = fileio.Value(VS_V, 'mm')
def update_header(self, header:Header):
"""
Add dataset information into an existing header.
"""
logging.info(f' meta data from: {self.file_list[0]}')
header.owner = self.owner
header.experiment = self.experiment
header.sample = self.sample
header.measurement_instrument_settings = self.instrument_settings
class AmorEventData(AmorHeader):
"""
Read one amor NeXus datafile and extract relevant header information.
Implements EventDatasetProtocol
"""
file_list: List[str]
first_index: int
last_index: int = -1
EOF: bool = False
max_events: int
owner: fileio.Person
experiment: fileio.Experiment
sample: fileio.Sample
instrument_settings: fileio.InstrumentSettings
geometry: AmorGeometry
timing: AmorTiming
data: AmorEventStream
eventStartTime: np.int64
def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000):
if type(fileName) is str:
logging.warning(f' {fileName.split("/")[-1]}')
self.file_list = [fileName]
hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:
self.file_list = [fileName.filename]
hdf = fileName
else:
self.file_list = [repr(fileName)]
hdf = h5py.File(fileName, 'r')
self.first_index = first_index
self.max_events = max_events
super().__init__(hdf)
self.hdf = hdf
try:
self.read_event_stream()
except EOFError:
self.hdf.close()
del(self.hdf)
raise
self.read_log_streams()
if type(fileName) is str:
# close the input file to free memory, only if the file was opened in this object
self.hdf.close()
del(self.hdf)
def read_event_stream(self):
"""
Read the actual event data from file. If file is too large, find event index from packets
that allow splitting of file smaller than self.max_events.
"""
data_group = self.hdf[self.hdf_paths['data'][0]]
packets = np.recarray(data_group['event_index'].shape, dtype=PACKET_TYPE)
packets.start_index = data_group['event_index'][:]
packets.time = data_group['event_time_zero'][:]
try:
# packet index that matches first event index
start_packet = int(np.where(packets.start_index==self.first_index)[0][0])
except IndexError:
raise EOFError(f'No event packet found starting at event #{self.first_index}, '
f'number of events is {data_group["event_time_offset"].shape[0]}')
packets = packets[start_packet:]
if packets.shape[0]==0:
raise EOFError(f'No more packets left after start_packet filter')
nevts = data_group['event_time_offset'].shape[0]
if (nevts-self.first_index)>self.max_events:
end_packet = np.where(packets.start_index<=(self.first_index+self.max_events))[0][-1]
end_packet = max(1, end_packet)
if len(packets)==1:
self.last_index = nevts-1
else:
self.last_index = packets.start_index[end_packet]-1
packets = packets[:end_packet]
else:
self.last_index = nevts-1
self.EOF = True
if packets.shape[0]==0:
raise EOFError(f'No more packets left after end_packet filter, first_index={self.first_index}, '
f'max_events={self.max_events}, nevts={nevts}')
nevts = self.last_index+1-self.first_index
# adapte packet to event index relation
packets.start_index -= self.first_index
events = np.recarray(nevts, dtype=EVENT_TYPE)
events.tof = np.array(data_group['event_time_offset'][self.first_index:self.last_index+1])/1.e9
events.pixelID = data_group['event_id'][self.first_index:self.last_index+1]
events.mask = 0
self._beam_monitor = self.rv('beam_monitor', default=None)
pulses = self.read_chopper_trigger_stream(packets)
current = self.read_proton_current_stream(packets)
beam_monitor = self.read_proton_current_stream(packets)
# read parameter logs not present in old files to ensure they are in self._log_keys if they exist
_acquisition_filter = self.rv('acquisition_filter', default=None)
self.data = AmorEventStream(events, packets, pulses, current, beam_monitor)
if self.first_index>0 and not self.EOF:
# label the file name if not all events were used
self.file_list[0] += f'[{self.first_index}:{self.last_index}]'
def read_log_streams(self):
"""
Read the individual NXlog datasets that can later be used for filtering etc.
"""
for key in self._log_keys:
hdf_path, dtype, *_ = self.hdf_paths[key]
hdfgroup = self.hdf[hdf_path]
shape = hdfgroup['time'].shape
data = np.recarray(shape, dtype=np.dtype([('value', self.hdf_paths[key][1]), ('time', np.int64)]))
data.time = hdfgroup['time'][:]
if len(hdfgroup['value'].shape)==1:
data.value = hdfgroup['value'][:]
else:
data.value = hdfgroup['value'][:, 0]
self.data.device_logs[key] = data
def update_info_from_logs(self):
relevant_items = ['polarization_config_label']
sample_keys = [f'sample_{param}' for param in self.SEE_PARAMS]
relevant_items += sample_keys
for key, log in self.data.device_logs.items():
# if SE is missing there can be a missing key or empty dataset
if key not in relevant_items:
continue
if log.value.size == 0:
continue
if log.value.dtype in [np.int8, np.int16, np.int32, np.int64]:
# for integer items (flags) report the most common one
value = np.bincount(log.value).argmax()
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG \
and np.unique(log.value).shape[0]>1:
logging.debug(f' filtered values for {key} not unique, '
f'has {np.unique(log.value).shape[0]} values')
else:
value = log.value.mean()
if key == 'polarization_config_label':
self.instrument_settings.polarization = fileio.Polarization(const.polarizationConfigs[value])
elif key in sample_keys:
param = key.split('_', 1)[1]
self.sample.sample_parameters[param].magnitue = value
def read_chopper_trigger_stream(self, packets):
trigger_group = self.hdf[self.hdf_paths['trigger'][0]]
chopper1TriggerTime = np.array(trigger_group['event_time_zero'][:-2], dtype=np.int64)
if np.shape(chopper1TriggerTime)[0] > 2:
startTime = chopper1TriggerTime[0]
pulseTimeS = chopper1TriggerTime
else:
logging.critical(' No chopper trigger data available, using event steram instead, pulse filtering will fail!')
startTime = np.array(self._beam_monitor.time[0], dtype=np.int64)
pulseTimeS = self._beam_monitor.time
pulses = np.recarray(pulseTimeS.shape, dtype=PULSE_TYPE)
pulses.time = pulseTimeS
pulses.monitor = 1. # default is monitor pulses as it requires no calculation
# apply filter in case the events were filtered
if (self.first_index>0 or not self.EOF):
pulses = pulses[(pulses.time>=packets.time[0])&(pulses.time<=packets.time[-1])]
self.eventStartTime = startTime
return pulses
def read_proton_current_stream(self, packets):
pc_group = self.hdf[self.hdf_paths['proton_current'][0]]
proton_current = np.recarray(pc_group['time'].shape, dtype=PC_TYPE)
proton_current.time = pc_group['time'][:]
if pc_group['value'].ndim==1:
proton_current.current = pc_group['value'][:]
else:
proton_current.current = pc_group['value'][:,0]
if self.first_index>0 or not self.EOF:
proton_current = proton_current[(proton_current.time>=packets.time[0])&
(proton_current.time<=packets.time[-1])]
return proton_current
def read_beam_monitor_stream(self, packets):
_signal = self._beam_monitor.signal
_time = self._beam_monitor.time
plse = np.array([0, 5, 6, 11])
base = np.array([2, 3, 8, 9])
i = 0
beam_monitor = np.recarray(shape=np.shape(packets.time)[0], dtype=BM_TYPE)
for tt, tme in enumerate(_time):
if (tme>=packets.time[0])&(tme<=packets.time[-1]):
beam_monitor.value[i] = _signal[tt][plse].sum() - _signal[tt][base].sum()
beam_monitor.time[i] = tme
i += 1
return beam_monitor[:i+1]
def info(self):
output = ""
for key in ['owner', 'experiment', 'sample', 'instrument_settings']:
value = repr(getattr(self, key)).replace("\n","\n ")
output += f'\n{key}={value},'
output += '\n'
return output
def append(self, other):
"""
Append event streams from another file to this one. Adjusts the event indices in the
packets to stay valid.
"""
new_events = np.concatenate([self.data.events, other.data.events]).view(np.recarray)
new_pulses = np.concatenate([self.data.pulses, other.data.pulses]).view(np.recarray)
new_proton_current = np.concatenate([self.data.proton_current, other.data.proton_current]).view(np.recarray)
new_beam_monitor = np.concatenate([self.data.beam_monitor, other.data.beam_monitor]).view(np.recarray)
new_packets = np.concatenate([self.data.packets, other.data.packets]).view(np.recarray)
new_packets.start_index[self.data.packets.shape[0]:] += self.data.events.shape[0]
self.data = AmorEventStream(new_events, new_packets, new_pulses, new_proton_current, new_beam_monitor)
# Indicate that this is a modified dataset, basically counts number of appends as negative indices
self.last_index = min(self.last_index-1, -1)
self.file_list += other.file_list
def __repr__(self):
output = (f"AmorEventData({self.file_list!r}) # {self.data.events.shape[0]} events, "
f"{self.data.pulses.shape[0]} pulses")
return output
def get_timeslice(self, start, end)->'AmorEventData':
# return a new dataset with just events that occured in given time slice
if not 'wallTime' in self.data.events.dtype.names:
raise ValueError("This dataset is missing a wallTime that is required for time slicing")
# convert from seconds to epoch integer values
start , end = start*1e9, end*1e9
event_filter = self.data.events.wallTime>=start
event_filter &= self.data.events.wallTime<end
pulse_filter = self.data.pulses.time>=start
pulse_filter &= self.data.pulses.time<end
output = super().__new__(AmorEventData)
for key, value in self.__dict__.items():
if key == 'data':
continue
else:
setattr(output, key, value)
# TODO: this is not strictly correct, as the packet/event relationship is lost
output.data = AmorEventStream(self.data.events[event_filter], self.data.packets,
self.data.pulses[pulse_filter], self.data.proton_current,
self.data.beam_monitor)
return output