Implement reading of values with NXlog as streams, put all hdf paths in dictionary and separtate nicos calls to own module

This commit is contained in:
2026-02-24 16:36:46 +01:00
parent 26b6057941
commit bc84b2e841
3 changed files with 155 additions and 59 deletions

View File

@@ -1,8 +1,8 @@
"""
Specify the data type and protocol used for event datasets.
"""
from typing import List, Optional, Protocol, Tuple
from dataclasses import dataclass
from typing import Dict, List, Optional, Protocol, Tuple
from dataclasses import dataclass, field
from .header import Header
from abc import ABC, abstractmethod
from hashlib import sha256
@@ -34,6 +34,7 @@ EVENT_TYPE = np.dtype([('tof', np.float64), ('pixelID', np.uint32), ('mask', np.
PACKET_TYPE = np.dtype([('start_index', np.uint32), ('time', np.int64)])
PULSE_TYPE = np.dtype([('time', np.int64), ('monitor', np.float32)])
PC_TYPE = np.dtype([('current', np.float32), ('time', np.int64)])
LOG_TYPE = np.dtype([('value', np.float32), ('time', np.int64)])
# define the bitmask for individual event filters
EVENT_BITMASKS = {
@@ -60,6 +61,7 @@ class AmorEventStream:
packets: np.recarray # PACKET_TYPE
pulses: np.recarray # PULSE_TYPE
proton_current: np.recarray # PC_TYPE
device_logs: Dict[str, np.recarray] = field(default_factory=dict) # LOG_TYPE
class EventDatasetProtocol(Protocol):
"""

View File

@@ -5,7 +5,6 @@ from typing import BinaryIO, List, Union
import h5py
import numpy as np
import platform
import logging
import subprocess
@@ -16,7 +15,8 @@ from orsopy.fileio.model_language import SampleModel
from . import const
from .header import Header
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, LOG_TYPE, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, \
PC_TYPE
try:
import zoneinfo
@@ -28,16 +28,38 @@ except ImportError:
# Time zone used to interpret time strings
AMOR_LOCAL_TIMEZONE = zoneinfo.ZoneInfo(key='Europe/Zurich')
if platform.node().startswith('amor'):
NICOS_CACHE_DIR = '/home/data/nicosdata/cache/'
GREP = '/usr/bin/grep "value"'
else:
NICOS_CACHE_DIR = None
class AmorHeader:
"""
Collects header information from Amor NeXus fiel without reading event data.
"""
# mapping of names to (hdf_path, dtype, nicos_key[, suffix])
hdf_paths = dict(
title=('entry1/title', str),
proposal_id=('entry1/proposal_id', str),
user_name=('entry1/user/name', str),
user_email=('entry1/user/email', str),
sample_name=('entry1/sample/name', str),
source_name=('entry1/Amor/source/name', str),
sample_model=('entry1/sample/model', str),
start_time=('entry1/start_time', str),
chopper_separation=('entry1/Amor/chopper/pair_separation', float),
detector_distance=('entry1/Amor/detector/transformation/distance', float),
chopper_distance=('entry1/Amor/chopper/distance', float),
sample_temperature=('entry1/sample/temperature', float, 'ignore'),
sample_magnetic_field=('entry1/sample/magnetic_field', float, 'ignore'),
mu=('entry1/Amor/instrument_control_parameters/mu', float, 'mu'),
nu=('entry1/Amor/instrument_control_parameters/nu', float, 'nu'),
kap=('entry1/Amor/instrument_control_parameters/kappa', float, 'kappa'),
kad=('entry1/Amor/instrument_control_parameters/kappa_offset', float, 'kappa_offset'),
div=('entry1/Amor/instrument_control_parameters/div', float, 'div'),
ch1_trigger_phase=('entry1/Amor/chopper/ch1_trigger_phase', float, 'ch1_trigger_phase'),
ch2_trigger_phase=('entry1/Amor/chopper/ch2_trigger_phase', float, 'ch2_trigger_phase'),
chopper_speed=('entry1/Amor/chopper/rotation_speed', float, 'chopper_phase'),
chopper_phase=('entry1/Amor/chopper/phase', float, 'chopper_phase'),
polarization_config_label=('entry1/Amor/polarization/configuration', int, 'polarization_config_label', '/*'),
)
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
if type(fileName) is str:
@@ -48,6 +70,8 @@ class AmorHeader:
else:
self.hdf = h5py.File(fileName, 'r')
self._log_keys = []
self.read_header_info()
self.read_instrument_configuration()
@@ -57,52 +81,74 @@ class AmorHeader:
del(self.hdf)
def _replace_if_missing(self, key, nicos_key, dtype=float, suffix=''):
try:
return dtype(self.hdf[f'/entry1/Amor/{key}'][0])
except(KeyError, IndexError):
if NICOS_CACHE_DIR:
try:
logging.info(f" using parameter {nicos_key} from nicos cache")
year_date = self.fileDate.strftime('%Y')
call = f'{GREP} {NICOS_CACHE_DIR}nicos-{nicos_key}/{year_date}{suffix}'
value = str(subprocess.getoutput(call)).split('\t')[-1]
return dtype(value)
except Exception:
logging.error(f"Couldn't get value from nicos cache {nicos_key}, {call}")
return dtype(0)
else:
logging.warning(f" parameter {key} not found, relpace by zero")
return dtype(0)
from .nicos_interface import lookup_nicos_value
year = self.fileDate.strftime('%Y')
return lookup_nicos_value(key, nicos_key, dtype, suffix, year)
def get_hdf_single_entry(self, path):
if not np.shape(self.hdf['entry1/title']):
def rv(self, key):
"""
Generic read value methos based on key in hdf_paths dictionary.
"""
hdf_path, dtype, *nicos = self.hdf_paths[key]
try:
hdfgrp = self.hdf[hdf_path]
if hdfgrp.attrs.get('NX_class', None) == 'NXlog':
self._log_keys.append(key)
# use the last value that was recoreded before the count started
time_column = hdfgrp['time'][:]
try:
start_index = np.where(time_column<=self._start_time_ns)[0][0]
except IndexError:
start_index = 0
if hdfgrp['value'].ndim==1:
return dtype(hdfgrp['value'][start_index])
else:
return dtype(hdfgrp['value'][start_index, 0])
elif dtype is str:
return self.read_string(hdf_path)
else:
return dtype(hdfgrp[0])
except (KeyError, IndexError):
if nicos:
nicos_key = nicos[0]
suffix = nicos[1] if len(nicos)>1 else ''
return self._replace_if_missing(key, nicos_key, dtype, suffix)
else:
raise
def read_string(self, path):
if not np.shape(self.hdf[path]):
return self.hdf[path][()].decode('utf-8')
else:
# format until 2025
return self.hdf[path][0].decode('utf-8')
def read_header_info(self):
self._start_time_ns = np.uint64(0)
start_time = self.rv('start_time')
# extract start time as unix time, adding UTC offset of 1h to time string
start_date = datetime.fromisoformat(start_time)
self.fileDate = start_date.replace(tzinfo=AMOR_LOCAL_TIMEZONE)
self._start_time_ns = np.uint64(self.fileDate.timestamp()*1e9)
# read general information and first data set
title = self.get_hdf_single_entry('entry1/title')
proposal_id = self.get_hdf_single_entry('entry1/proposal_id')
user_name = self.get_hdf_single_entry('entry1/user/name')
title = self.rv('title')
proposal_id = self.rv('proposal_id')
user_name = self.rv('user_name')
user_affiliation = 'unknown'
user_email = self.get_hdf_single_entry('entry1/user/email')
user_email = self.rv('user_email')
user_orcid = None
sampleName = self.get_hdf_single_entry('entry1/sample/name')
sampleName = self.rv('sample_name')
instrumentName = 'Amor'
source = self.get_hdf_single_entry('entry1/Amor/source/name')
source = self.rv('source_name')
sourceProbe = 'neutron'
model = self.get_hdf_single_entry('entry1/sample/model')
model = self.rv('sample_model')
if 'stack' in model:
import yaml
model = yaml.safe_load(model)
else:
model = dict(stack=model)
start_time = self.get_hdf_single_entry('entry1/start_time')
# extract start time as unix time, adding UTC offset of 1h to time string
start_date = datetime.fromisoformat(start_time)
self.fileDate = start_date.replace(tzinfo=AMOR_LOCAL_TIMEZONE)
self.owner = fileio.Person(
name=user_name,
@@ -130,28 +176,28 @@ class AmorHeader:
sample_parameters={},
)
# while event times are not evaluated, use average_value reported in file for SEE
if self.hdf['entry1/sample'].get('temperature', None) is not None \
and float(self.hdf['entry1/sample/temperature/average_value'][0])>0:
self.sample.sample_parameters['temperature'] = fileio.Value(
float(self.hdf['entry1/sample/temperature/average_value'][0]), unit='K')
if self.hdf['entry1/sample'].get('temperature', None) is not None:
sample_temperature = self.rv('sample_temperature')
self.sample.sample_parameters['temperature'] = fileio.Value(sample_temperature, unit='K')
if self.hdf['entry1/sample'].get('magnetic_field', None) is not None:
self.sample.sample_parameters['magnetic_field'] = fileio.Value(
float(self.hdf['entry1/sample/magnetic_field/average_value'][0]), unit='T')
sample_magnetic_field = self.rv('sample_magnetic_field')
self.sample.sample_parameters['magnetic_field'] = fileio.Value(sample_magnetic_field, unit='T')
def read_instrument_configuration(self):
chopperSeparation = float(np.take(self.hdf['entry1/Amor/chopper/pair_separation'], 0))
detectorDistance = float(np.take(self.hdf['entry1/Amor/detector/transformation/distance'], 0))
chopperDetectorDistance = detectorDistance-float(np.take(self.hdf['entry1/Amor/chopper/distance'], 0))
chopperSeparation = self.rv('chopper_separation')
detectorDistance = self.rv('detector_distance')
chopperDistance = self.rv('chopper_distance')
chopperDetectorDistance = detectorDistance - chopperDistance
polarizationConfigs = ['unpolarized', 'unpolarized', 'po', 'mo', 'op', 'pp', 'mp', 'om', 'pm', 'mm']
mu = self._replace_if_missing('instrument_control_parameters/mu', 'mu', float)
nu = self._replace_if_missing('instrument_control_parameters/nu', 'nu', float)
kap = self._replace_if_missing('instrument_control_parameters/kappa', 'kappa', float)
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kappa_offset', float)
div = self._replace_if_missing('instrument_control_parameters/div', 'div', float)
ch1TriggerPhase = self._replace_if_missing('chopper/ch1_trigger_phase', 'ch1_trigger_phase', float)
ch2TriggerPhase = self._replace_if_missing('chopper/ch2_trigger_phase', 'ch2_trigger_phase', float)
mu = self.rv('mu')
nu = self.rv('nu')
kap = self.rv('kap')
kad = self.rv('kad')
div = self.rv('div')
ch1TriggerPhase = self.rv('ch1_trigger_phase')
ch2TriggerPhase = self.rv('ch2_trigger_phase')
try:
chopperTriggerTime = (float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][7]) \
-float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][0])) \
@@ -159,8 +205,8 @@ class AmorHeader:
chopperTriggerTimeDiff = float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_offset'][2])
except (KeyError, IndexError):
logging.debug(' chopper speed and phase taken from .hdf file')
chopperSpeed = self._replace_if_missing('chopper/rotation_speed', 'chopper_phase', float)
chopperPhase = self._replace_if_missing('chopper/phase', 'chopper_phase', float)
chopperSpeed = self.rv('chopper_speed')
chopperPhase = self.rv('chopper_phase')
tau = 30/chopperSpeed
else:
tau = int(1e-6*chopperTriggerTime/2+0.5)*(1e-3)
@@ -172,7 +218,7 @@ class AmorHeader:
chopperSeparation, detectorDistance, chopperDetectorDistance)
self.timing = AmorTiming(ch1TriggerPhase, ch2TriggerPhase, chopperSpeed, chopperPhase, tau)
polarizationConfigLabel = self._replace_if_missing('polarization/configuration/average_value', 'polarization_config_label', int, suffix='/*')
polarizationConfigLabel = self.rv('polarization_config_label')
polarizationConfig = fileio.Polarization(polarizationConfigs[polarizationConfigLabel])
logging.debug(f' polarization configuration: {polarizationConfig} (index {polarizationConfigLabel})')
@@ -259,6 +305,7 @@ class AmorEventData(AmorHeader):
super().__init__(hdf)
self.hdf = hdf
self.read_event_stream()
self.read_log_streams()
if type(fileName) is str:
# close the input file to free memory, only if the file was opened in this object
@@ -308,6 +355,22 @@ class AmorEventData(AmorHeader):
# label the file name if not all events were used
self.file_list[0] += f'[{self.first_index}:{self.last_index}]'
def read_log_streams(self):
"""
Read the individual NXlog datasets that can later be used for filtering etc.
"""
for key in self._log_keys:
hdf_path, dtype, *_ = self.hdf_paths[key]
hdfgroup = self.hdf[hdf_path]
shape = hdfgroup['time'].shape
data = np.recarray(shape, dtype=LOG_TYPE)
data.time = hdfgroup['time'][:]
if len(hdfgroup['value'].shape)==1:
data.value = hdfgroup['value'][:]
else:
data.value = hdfgroup['value'][:, 0]
self.data.device_logs[key] = data
def read_chopper_trigger_stream(self, packets):
chopper1TriggerTime = np.array(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][:-2], dtype=np.int64)
#self.chopper2TriggerTime = self.chopper1TriggerTime + np.array(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time'][:-2], dtype=np.int64)
@@ -316,7 +379,7 @@ class AmorEventData(AmorHeader):
startTime = chopper1TriggerTime[0]
pulseTimeS = chopper1TriggerTime
else:
logging.warn(' no chopper trigger data available, using event steram instead')
logging.warning(' no chopper trigger data available, using event steram instead')
startTime = np.array(self.hdf['/entry1/Amor/detector/data/event_time_zero'][0], dtype=np.int64)
stopTime = np.array(self.hdf['/entry1/Amor/detector/data/event_time_zero'][-2], dtype=np.int64)
pulseTimeS = np.arange(startTime, stopTime, self.timing.tau*1e9, dtype=np.int64)

31
eos/nicos_interface.py Normal file
View File

@@ -0,0 +1,31 @@
"""
Functions used to directly access information from nicos.
"""
import platform
import logging
import subprocess
if platform.node().startswith('amor'):
NICOS_CACHE_DIR = '/home/data/nicosdata/cache/'
GREP = '/usr/bin/grep "value"'
else:
NICOS_CACHE_DIR = None
def lookup_nicos_value(key, nicos_key, dtype=float, suffix='', year="2026"):
# TODO: Implement direct communication to nicos to read the cache
if nicos_key=='ignore':
return dtype(0)
if NICOS_CACHE_DIR:
try:
logging.info(f" using parameter {nicos_key} from nicos cache")
call = f'{GREP} {NICOS_CACHE_DIR}nicos-{nicos_key}/{year}{suffix}'
value = str(subprocess.getoutput(call)).split('\t')[-1]
return dtype(value)
except Exception:
logging.error(f"Couldn't get value from nicos cache {nicos_key}, {call}")
return dtype(0)
else:
logging.warning(f" parameter {key} not found, relpace by zero")
return dtype(0)