23 Commits

Author SHA1 Message Date
20d3bf45c9 Release changed version with log value filters
Some checks failed
Release / test (3.10) (push) Has been cancelled
Release / test (3.11) (push) Has been cancelled
Release / test (3.12) (push) Has been cancelled
Release / test (3.13) (push) Has been cancelled
Release / test (3.8) (push) Has been cancelled
Release / test (3.9) (push) Has been cancelled
Release / build-ubuntu-latest (push) Has been cancelled
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2026-02-26 10:16:42 +01:00
f07a06370f React to both yz and toz stop commands 2026-02-26 10:15:25 +01:00
7c951a2f14 React to both yz and toz stop commands 2026-02-25 16:30:57 +01:00
493095331f revert change and show critical message instead, as chopper triggers are actually needed 2026-02-25 15:59:02 +01:00
c78d5412d5 replace recreation of pulse times based on packet time differences 2026-02-25 15:50:37 +01:00
389d485476 try to fix error if packets are larger then max_events
Some checks failed
Unit Testing / test (3.10) (push) Has been cancelled
Unit Testing / test (3.11) (push) Has been cancelled
Unit Testing / test (3.12) (push) Has been cancelled
Unit Testing / test (3.8) (push) Has been cancelled
Unit Testing / test (3.9) (push) Has been cancelled
2026-02-25 15:11:10 +01:00
9db1d399dc minor 2026-02-25 15:01:30 +01:00
9facb0e04f check start/end packet filter 2026-02-25 14:52:58 +01:00
198fc07421 minor 2026-02-25 14:42:35 +01:00
80ec3da039 minor 2026-02-25 14:30:03 +01:00
09ea513aad try fixing empty packets error when reading new files 2026-02-25 14:24:13 +01:00
4ecf6252a9 minor testing 2026-02-25 14:08:33 +01:00
0cb7f5ceb2 allow fallback for start time if writing
Some checks failed
Unit Testing / test (3.10) (push) Has been cancelled
Unit Testing / test (3.11) (push) Has been cancelled
Unit Testing / test (3.12) (push) Has been cancelled
Unit Testing / test (3.8) (push) Has been cancelled
Unit Testing / test (3.9) (push) Has been cancelled
2026-02-25 13:29:59 +01:00
2df7fd9e7c Minor fixes for new changes in data format 2026-02-25 13:17:07 +01:00
42234d86fd Add nicos request via socket as first alternative looking up devices
Some checks failed
Unit Testing / test (3.8) (push) Has been cancelled
Unit Testing / test (3.9) (push) Has been cancelled
Unit Testing / test (3.10) (push) Has been cancelled
Unit Testing / test (3.11) (push) Has been cancelled
Unit Testing / test (3.12) (push) Has been cancelled
2026-02-25 08:36:16 +01:00
bc84b2e841 Implement reading of values with NXlog as streams, put all hdf paths in dictionary and separtate nicos calls to own module 2026-02-24 16:36:46 +01:00
26b6057941 Try fixing test failure due to floating point precision issue
Some checks failed
Unit Testing / test (3.10) (push) Has been cancelled
Unit Testing / test (3.11) (push) Has been cancelled
Unit Testing / test (3.12) (push) Has been cancelled
Unit Testing / test (3.8) (push) Has been cancelled
Unit Testing / test (3.9) (push) Has been cancelled
2026-02-24 08:27:57 +01:00
1d8fea7498 Add further tests for all other event actions 2026-02-23 16:30:54 +01:00
d2fff51787 Start adding specific tests for event handling actions 2026-02-23 15:30:25 +01:00
8347942c15 Fix qzRange being ignored in filtering if high value not below 0.5 2026-02-23 10:11:33 +01:00
f55d840b8e new version
Some checks failed
Release / test (3.10) (push) Has been cancelled
Release / test (3.11) (push) Has been cancelled
Release / test (3.12) (push) Has been cancelled
Release / test (3.13) (push) Has been cancelled
Release / test (3.8) (push) Has been cancelled
Release / test (3.9) (push) Has been cancelled
Release / build-ubuntu-latest (push) Has been cancelled
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2026-02-20 13:57:50 +01:00
327d6a0ff8 Fix indentation error 2026-02-20 13:53:54 +01:00
22943bc513 Run tests before any release 2026-02-20 13:51:07 +01:00
14 changed files with 860 additions and 110 deletions

View File

@@ -22,7 +22,36 @@ on:
- all_incl_release
jobs:
test:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
fail-fast: false
steps:
- uses: actions/checkout@v4
with:
lfs: 'true'
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
pip install -r requirements.txt
- name: Test with pytest
run: |
python -m pytest tests
build-ubuntu-latest:
needs: [test]
runs-on: ubuntu-latest
if: ${{ (github.event_name != 'workflow_dispatch') || (contains(fromJson('["all", "linux", "all_incl_release"]'), github.event.inputs.build-items)) }}
permissions:
@@ -55,6 +84,7 @@ jobs:
skip-existing: true
build-windows:
needs: [test]
runs-on: windows-latest
if: ${{ (github.event_name != 'workflow_dispatch') || (contains(fromJson('["all", "windows", "all_incl_release"]'), github.event.inputs.build-items)) }}

View File

@@ -1,38 +1,38 @@
name: Unit Testing
on:
push:
branches:
- main
pull_request:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
fail-fast: false
steps:
- uses: actions/checkout@v4
with:
lfs: 'true'
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
pip install -r requirements.txt
- name: Test with pytest
run: |
python -m pytest tests
name: Unit Testing
on:
push:
branches:
- main
pull_request:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
fail-fast: false
steps:
- uses: actions/checkout@v4
with:
lfs: 'true'
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
pip install -r requirements.txt
- name: Test with pytest
run: |
python -m pytest tests

View File

@@ -2,5 +2,5 @@
Package to handle data redction at AMOR instrument to be used by __main__.py script.
"""
__version__ = '3.1.0'
__date__ = '2026-02-20'
__version__ = '3.2.0'
__date__ = '2026-02-26'

View File

@@ -124,5 +124,60 @@ class FilterQzRange(EventDataAction):
if not 'qz' in dataset.data.events.dtype.names:
raise ValueError("FilterQzRange requires dataset with qz values per events, perform WavelengthAndQ first")
if self.qzRange[1]<0.5:
d.events.mask += EVENT_BITMASKS["qRange"]*((self.qzRange[0]>d.events.qz) | (d.events.qz>self.qzRange[1]))
d.events.mask += EVENT_BITMASKS["qRange"]*((self.qzRange[0]>d.events.qz) | (d.events.qz>self.qzRange[1]))
class FilterByLog(EventDataAction):
def __init__(self, filter_string):
self.filter_string = filter_string
def perform_action(self, dataset: EventDatasetProtocol) -> None:
filter_variable = None
# go through existing keys in reverse order of their length to insure a name containing another is used
existing_keys = list(dataset.data.device_logs.keys())
existing_keys.sort(key=lambda x: -len(x))
for key in existing_keys:
if key in self.filter_string:
filter_variable = key
break
if filter_variable is None:
logging.warning(f' could not find suitable parameter to filter on in {self.filter_string}, '
f'available parameters are: {list(sorted(existing_keys))}')
return
logging.debug(f' using parameter {filter_variable} to apply filter {self.filter_string}')
if not filter_variable in EVENT_BITMASKS:
EVENT_BITMASKS[filter_variable] = max(EVENT_BITMASKS.values())*2
if not filter_variable in dataset.data.pulses.dtype.names:
# interpolate the parameter values for all existing pulses
self.add_log_to_pulses(filter_variable, dataset)
fltr_pulses = eval(self.filter_string, {filter_variable: dataset.data.pulses[filter_variable]})
goodTimeS = dataset.data.pulses.time[fltr_pulses]
filter_e = np.logical_not(np.isin(dataset.data.events.wallTime, goodTimeS))
dataset.data.events.mask += EVENT_BITMASKS[filter_variable]*filter_e
def add_log_to_pulses(self, key, dataset: EventDatasetProtocol):
"""
Add a log value for each pulse to the pulses array.
"""
# TODO: perform some interpolation for the pulse where a change occured
pulses = dataset.data.pulses
log_data = dataset.data.device_logs[key]
if log_data.time[0]>0:
logTimeS = np.hstack([[0], log_data.time, [pulses.time[-1]+1]])
logValues = np.hstack([[log_data.value[0]], log_data.value])
else:
logTimeS = np.hstack([log_data.time, [pulses.time[-1]+1]])
logValues = log_data.value
pulseLogS = np.zeros(pulses.time.shape[0], dtype=log_data.value.dtype)
j = 0
for i, ti in enumerate(pulses.time):
# find the last current item that was before this pulse
while ti>=logTimeS[j+1]:
j += 1
pulseLogS[i] = logValues[j]
pulses = append_fields(pulses, [(key, pulseLogS.dtype)])
pulses[key] = pulseLogS
dataset.data.pulses = pulses

View File

@@ -1,8 +1,8 @@
"""
Specify the data type and protocol used for event datasets.
"""
from typing import List, Optional, Protocol, Tuple
from dataclasses import dataclass
from typing import Dict, List, Optional, Protocol, Tuple
from dataclasses import dataclass, field
from .header import Header
from abc import ABC, abstractmethod
from hashlib import sha256
@@ -34,6 +34,7 @@ EVENT_TYPE = np.dtype([('tof', np.float64), ('pixelID', np.uint32), ('mask', np.
PACKET_TYPE = np.dtype([('start_index', np.uint32), ('time', np.int64)])
PULSE_TYPE = np.dtype([('time', np.int64), ('monitor', np.float32)])
PC_TYPE = np.dtype([('current', np.float32), ('time', np.int64)])
LOG_TYPE = np.dtype([('value', np.float32), ('time', np.int64)])
# define the bitmask for individual event filters
EVENT_BITMASKS = {
@@ -60,6 +61,7 @@ class AmorEventStream:
packets: np.recarray # PACKET_TYPE
pulses: np.recarray # PULSE_TYPE
proton_current: np.recarray # PC_TYPE
device_logs: Dict[str, np.recarray] = field(default_factory=dict) # LOG_TYPE
class EventDatasetProtocol(Protocol):
"""

View File

@@ -71,6 +71,8 @@ class CorrectSeriesTime(EventDataAction):
dataset.data.pulses.time -= self.seriesStartTime
dataset.data.events.wallTime -= self.seriesStartTime
dataset.data.proton_current.time -= self.seriesStartTime
for value in dataset.data.device_logs.values():
value.time -= self.seriesStartTime
start, stop = dataset.data.events.wallTime[0], dataset.data.events.wallTime[-1]
logging.debug(f' wall time from {start/1e9:6.1f} s to {stop/1e9:6.1f} s, '
f'series time = {self.seriesStartTime/1e9:6.1f}')

View File

@@ -5,7 +5,6 @@ from typing import BinaryIO, List, Union
import h5py
import numpy as np
import platform
import logging
import subprocess
@@ -16,7 +15,8 @@ from orsopy.fileio.model_language import SampleModel
from . import const
from .header import Header
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, LOG_TYPE, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, \
PC_TYPE
try:
import zoneinfo
@@ -28,16 +28,39 @@ except ImportError:
# Time zone used to interpret time strings
AMOR_LOCAL_TIMEZONE = zoneinfo.ZoneInfo(key='Europe/Zurich')
if platform.node().startswith('amor'):
NICOS_CACHE_DIR = '/home/data/nicosdata/cache/'
GREP = '/usr/bin/grep "value"'
else:
NICOS_CACHE_DIR = None
class AmorHeader:
"""
Collects header information from Amor NeXus fiel without reading event data.
"""
# mapping of names to (hdf_path, dtype, nicos_key[, suffix])
hdf_paths = dict(
title=('entry1/title', str),
proposal_id=('entry1/proposal_id', str),
user_name=('entry1/user/name', str),
user_email=('entry1/user/email', str),
sample_name=('entry1/sample/name', str),
source_name=('entry1/Amor/source/name', str),
sample_model=('entry1/sample/model', str),
start_time=('entry1/start_time', str),
start_time_fallback=('entry1/Amor/instrument_control_parameters/start_time', str),
chopper_separation=('entry1/Amor/chopper/pair_separation', float),
detector_distance=('entry1/Amor/detector/transformation/distance', float),
chopper_distance=('entry1/Amor/chopper/distance', float),
sample_temperature=('entry1/sample/temperature', float, 'ignore'),
sample_magnetic_field=('entry1/sample/magnetic_field', float, 'ignore'),
mu=('entry1/Amor/instrument_control_parameters/mu', float, 'mu'),
nu=('entry1/Amor/instrument_control_parameters/nu', float, 'nu'),
kap=('entry1/Amor/instrument_control_parameters/kappa', float, 'kappa'),
kad=('entry1/Amor/instrument_control_parameters/kappa_offset', float, 'kappa_offset'),
div=('entry1/Amor/instrument_control_parameters/div', float, 'div'),
ch1_trigger_phase=('entry1/Amor/chopper/ch1_trigger_phase', float, 'ch1_trigger_phase'),
ch2_trigger_phase=('entry1/Amor/chopper/ch2_trigger_phase', float, 'ch2_trigger_phase'),
chopper_speed=('entry1/Amor/chopper/rotation_speed', float, 'chopper_phase'),
chopper_phase=('entry1/Amor/chopper/phase', float, 'chopper_phase'),
polarization_config_label=('entry1/Amor/polarization/configuration', int, 'polarization_config_label', '/*'),
)
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
if type(fileName) is str:
@@ -48,6 +71,8 @@ class AmorHeader:
else:
self.hdf = h5py.File(fileName, 'r')
self._log_keys = []
self.read_header_info()
self.read_instrument_configuration()
@@ -57,52 +82,81 @@ class AmorHeader:
del(self.hdf)
def _replace_if_missing(self, key, nicos_key, dtype=float, suffix=''):
try:
return dtype(self.hdf[f'/entry1/Amor/{key}'][0])
except(KeyError, IndexError):
if NICOS_CACHE_DIR:
try:
logging.info(f" using parameter {nicos_key} from nicos cache")
year_date = self.fileDate.strftime('%Y')
call = f'{GREP} {NICOS_CACHE_DIR}nicos-{nicos_key}/{year_date}{suffix}'
value = str(subprocess.getoutput(call)).split('\t')[-1]
return dtype(value)
except Exception:
logging.error(f"Couldn't get value from nicos cache {nicos_key}, {call}")
return dtype(0)
else:
logging.warning(f" parameter {key} not found, relpace by zero")
return dtype(0)
from .nicos_interface import lookup_nicos_value
year = self.fileDate.strftime('%Y')
return lookup_nicos_value(key, nicos_key, dtype, suffix, year)
def get_hdf_single_entry(self, path):
if not np.shape(self.hdf['entry1/title']):
def rv(self, key):
"""
Generic read value methos based on key in hdf_paths dictionary.
"""
hdf_path, dtype, *nicos = self.hdf_paths[key]
try:
hdfgrp = self.hdf[hdf_path]
if hdfgrp.attrs.get('NX_class', None) == 'NXlog':
self._log_keys.append(key)
# use the last value that was recoreded before the count started
time_column = hdfgrp['time'][:]
try:
start_index = np.where(time_column<=self._start_time_ns)[0][0]
except IndexError:
start_index = 0
if hdfgrp['value'].ndim==1:
return dtype(hdfgrp['value'][start_index])
else:
return dtype(hdfgrp['value'][start_index, 0])
elif dtype is str:
return self.read_string(hdf_path)
else:
if len(hdfgrp.shape)==1:
return dtype(hdfgrp[0])
else:
return dtype(hdfgrp[()])
except (KeyError, IndexError):
if nicos:
nicos_key = nicos[0]
suffix = nicos[1] if len(nicos)>1 else ''
return self._replace_if_missing(key, nicos_key, dtype, suffix)
else:
raise
def read_string(self, path):
if not np.shape(self.hdf[path]):
return self.hdf[path][()].decode('utf-8')
else:
# format until 2025
# format until 2025
return self.hdf[path][0].decode('utf-8')
def read_header_info(self):
self._start_time_ns = np.uint64(0)
try:
start_time = self.rv('start_time')
except KeyError:
start_time = self.rv('start_time_fallback')
# extract start time as unix time, adding UTC offset of 1h to time string
start_date = datetime.fromisoformat(start_time)
self.fileDate = start_date.replace(tzinfo=AMOR_LOCAL_TIMEZONE)
self._start_time_ns = np.uint64(self.fileDate.timestamp()*1e9)
# read general information and first data set
title = self.get_hdf_single_entry('entry1/title')
proposal_id = self.get_hdf_single_entry('entry1/proposal_id')
user_name = self.get_hdf_single_entry('entry1/user/name')
title = self.rv('title')
proposal_id = self.rv('proposal_id')
user_name = self.rv('user_name')
user_affiliation = 'unknown'
user_email = self.get_hdf_single_entry('entry1/user/email')
user_email = self.rv('user_email')
user_orcid = None
sampleName = self.get_hdf_single_entry('entry1/sample/name')
sampleName = self.rv('sample_name')
instrumentName = 'Amor'
source = self.get_hdf_single_entry('entry1/Amor/source/name')
source = self.rv('source_name')
sourceProbe = 'neutron'
model = self.get_hdf_single_entry('entry1/sample/model')
model = self.rv('sample_model')
if 'stack' in model:
import yaml
model = yaml.safe_load(model)
else:
model = dict(stack=model)
start_time = self.get_hdf_single_entry('entry1/start_time')
# extract start time as unix time, adding UTC offset of 1h to time string
start_date = datetime.fromisoformat(start_time)
self.fileDate = start_date.replace(tzinfo=AMOR_LOCAL_TIMEZONE)
self.owner = fileio.Person(
name=user_name,
@@ -130,28 +184,28 @@ class AmorHeader:
sample_parameters={},
)
# while event times are not evaluated, use average_value reported in file for SEE
if self.hdf['entry1/sample'].get('temperature', None) is not None \
and float(self.hdf['entry1/sample/temperature/average_value'][0])>0:
self.sample.sample_parameters['temperature'] = fileio.Value(
float(self.hdf['entry1/sample/temperature/average_value'][0]), unit='K')
if self.hdf['entry1/sample'].get('temperature', None) is not None:
sample_temperature = self.rv('sample_temperature')
self.sample.sample_parameters['temperature'] = fileio.Value(sample_temperature, unit='K')
if self.hdf['entry1/sample'].get('magnetic_field', None) is not None:
self.sample.sample_parameters['magnetic_field'] = fileio.Value(
float(self.hdf['entry1/sample/magnetic_field/average_value'][0]), unit='T')
sample_magnetic_field = self.rv('sample_magnetic_field')
self.sample.sample_parameters['magnetic_field'] = fileio.Value(sample_magnetic_field, unit='T')
def read_instrument_configuration(self):
chopperSeparation = float(np.take(self.hdf['entry1/Amor/chopper/pair_separation'], 0))
detectorDistance = float(np.take(self.hdf['entry1/Amor/detector/transformation/distance'], 0))
chopperDetectorDistance = detectorDistance-float(np.take(self.hdf['entry1/Amor/chopper/distance'], 0))
chopperSeparation = self.rv('chopper_separation')
detectorDistance = self.rv('detector_distance')
chopperDistance = self.rv('chopper_distance')
chopperDetectorDistance = detectorDistance - chopperDistance
polarizationConfigs = ['unpolarized', 'unpolarized', 'po', 'mo', 'op', 'pp', 'mp', 'om', 'pm', 'mm']
mu = self._replace_if_missing('instrument_control_parameters/mu', 'mu', float)
nu = self._replace_if_missing('instrument_control_parameters/nu', 'nu', float)
kap = self._replace_if_missing('instrument_control_parameters/kappa', 'kappa', float)
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kappa_offset', float)
div = self._replace_if_missing('instrument_control_parameters/div', 'div', float)
ch1TriggerPhase = self._replace_if_missing('chopper/ch1_trigger_phase', 'ch1_trigger_phase', float)
ch2TriggerPhase = self._replace_if_missing('chopper/ch2_trigger_phase', 'ch2_trigger_phase', float)
mu = self.rv('mu')
nu = self.rv('nu')
kap = self.rv('kap')
kad = self.rv('kad')
div = self.rv('div')
ch1TriggerPhase = self.rv('ch1_trigger_phase')
ch2TriggerPhase = self.rv('ch2_trigger_phase')
try:
chopperTriggerTime = (float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][7]) \
-float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][0])) \
@@ -159,8 +213,8 @@ class AmorHeader:
chopperTriggerTimeDiff = float(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_offset'][2])
except (KeyError, IndexError):
logging.debug(' chopper speed and phase taken from .hdf file')
chopperSpeed = self._replace_if_missing('chopper/rotation_speed', 'chopper_phase', float)
chopperPhase = self._replace_if_missing('chopper/phase', 'chopper_phase', float)
chopperSpeed = self.rv('chopper_speed')
chopperPhase = self.rv('chopper_phase')
tau = 30/chopperSpeed
else:
tau = int(1e-6*chopperTriggerTime/2+0.5)*(1e-3)
@@ -172,7 +226,7 @@ class AmorHeader:
chopperSeparation, detectorDistance, chopperDetectorDistance)
self.timing = AmorTiming(ch1TriggerPhase, ch2TriggerPhase, chopperSpeed, chopperPhase, tau)
polarizationConfigLabel = self._replace_if_missing('polarization/configuration/average_value', 'polarization_config_label', int, suffix='/*')
polarizationConfigLabel = self.rv('polarization_config_label')
polarizationConfig = fileio.Polarization(polarizationConfigs[polarizationConfigLabel])
logging.debug(f' polarization configuration: {polarizationConfig} (index {polarizationConfigLabel})')
@@ -258,7 +312,13 @@ class AmorEventData(AmorHeader):
super().__init__(hdf)
self.hdf = hdf
self.read_event_stream()
try:
self.read_event_stream()
except EOFError:
self.hdf.close()
del(self.hdf)
raise
self.read_log_streams()
if type(fileName) is str:
# close the input file to free memory, only if the file was opened in this object
@@ -281,15 +341,26 @@ class AmorEventData(AmorHeader):
raise EOFError(f'No event packet found starting at event #{self.first_index}, '
f'number of events is {self.hdf["/entry1/Amor/detector/data/event_time_offset"].shape[0]}')
packets = packets[start_packet:]
if packets.shape[0]==0:
raise EOFError(f'No more packets left after start_packet filter')
nevts = self.hdf['/entry1/Amor/detector/data/event_time_offset'].shape[0]
if (nevts-self.first_index)>self.max_events:
end_packet = np.where(packets.start_index<=(self.first_index+self.max_events))[0][-1]
self.last_index = packets.start_index[end_packet]-1
end_packet = max(1, end_packet)
if len(packets)==1:
self.last_index = nevts-1
else:
self.last_index = packets.start_index[end_packet]-1
packets = packets[:end_packet]
else:
self.last_index = nevts-1
self.EOF = True
if packets.shape[0]==0:
raise EOFError(f'No more packets left after end_packet filter, first_index={self.first_index}, '
f'max_events={self.max_events}, nevts={nevts}')
nevts = self.last_index+1-self.first_index
# adapte packet to event index relation
@@ -308,6 +379,22 @@ class AmorEventData(AmorHeader):
# label the file name if not all events were used
self.file_list[0] += f'[{self.first_index}:{self.last_index}]'
def read_log_streams(self):
"""
Read the individual NXlog datasets that can later be used for filtering etc.
"""
for key in self._log_keys:
hdf_path, dtype, *_ = self.hdf_paths[key]
hdfgroup = self.hdf[hdf_path]
shape = hdfgroup['time'].shape
data = np.recarray(shape, dtype=LOG_TYPE)
data.time = hdfgroup['time'][:]
if len(hdfgroup['value'].shape)==1:
data.value = hdfgroup['value'][:]
else:
data.value = hdfgroup['value'][:, 0]
self.data.device_logs[key] = data
def read_chopper_trigger_stream(self, packets):
chopper1TriggerTime = np.array(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][:-2], dtype=np.int64)
#self.chopper2TriggerTime = self.chopper1TriggerTime + np.array(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time'][:-2], dtype=np.int64)
@@ -316,7 +403,7 @@ class AmorEventData(AmorHeader):
startTime = chopper1TriggerTime[0]
pulseTimeS = chopper1TriggerTime
else:
logging.warn(' no chopper trigger data available, using event steram instead')
logging.critical(' No chopper trigger data available, using event steram instead, pulse filtering will fail!')
startTime = np.array(self.hdf['/entry1/Amor/detector/data/event_time_zero'][0], dtype=np.int64)
stopTime = np.array(self.hdf['/entry1/Amor/detector/data/event_time_zero'][-2], dtype=np.int64)
pulseTimeS = np.arange(startTime, stopTime, self.timing.tau*1e9, dtype=np.int64)
@@ -324,7 +411,7 @@ class AmorEventData(AmorHeader):
pulses.time = pulseTimeS
pulses.monitor = 1. # default is monitor pulses as it requires no calculation
# apply filter in case the events were filtered
if self.first_index>0 or not self.EOF:
if (self.first_index>0 or not self.EOF):
pulses = pulses[(pulses.time>=packets.time[0])&(pulses.time<=packets.time[-1])]
self.eventStartTime = startTime
return pulses
@@ -332,7 +419,11 @@ class AmorEventData(AmorHeader):
def read_proton_current_stream(self, packets):
proton_current = np.recarray(self.hdf['entry1/Amor/detector/proton_current/time'].shape, dtype=PC_TYPE)
proton_current.time = self.hdf['entry1/Amor/detector/proton_current/time'][:]
proton_current.current = self.hdf['entry1/Amor/detector/proton_current/value'][:,0]
if self.hdf['entry1/Amor/detector/proton_current/value'].ndim==1:
proton_current.current = self.hdf['entry1/Amor/detector/proton_current/value'][:]
else:
proton_current.current = self.hdf['entry1/Amor/detector/proton_current/value'][:,0]
if self.first_index>0 or not self.EOF:
proton_current = proton_current[(proton_current.time>=packets.time[0])&
(proton_current.time<=packets.time[-1])]

View File

@@ -160,7 +160,7 @@ class ESSSerializer:
)
self.producer.flush()
if isinstance(command, Stop):
if command.hist_id == self._active_histogram_yz:
if command.hist_id in [self._active_histogram_yz, self._active_histogram_tofz]:
self.count_stopped.set()
else:
return

60
eos/nicos_interface.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Functions used to directly access information from nicos.
"""
import socket
import platform
import logging
import subprocess
ON_AMOR = platform.node().startswith('amor')
NICOS_CACHE_DIR = '/home/data/nicosdata/cache/'
GREP = '/usr/bin/grep "value"'
def lookup_nicos_value(key, nicos_key, dtype=float, suffix='', year="2026"):
# TODO: Implement direct communication to nicos to read the cache
if nicos_key=='ignore':
return dtype(0)
try:
logging.debug(f' trying socket request for device {nicos_key}')
response = nicos_single_request(nicos_key)
logging.info(f" using parameter {nicos_key} from nicos cache via socket")
return dtype(response)
except Exception as e:
logging.debug(f' socket request failed with {e!r}')
if ON_AMOR:
logging.debug(f" trying to extract {nicos_key} from nicos cache files")
call = f'{GREP} {NICOS_CACHE_DIR}nicos-{nicos_key}/{year}{suffix}'
try:
value = str(subprocess.getoutput(call)).split('\t')[-1]
logging.info(f" using parameter {nicos_key} from nicos cache file")
return dtype(value)
except Exception:
logging.error(f" couldn't get value from nicos cache {nicos_key}, {call}")
return dtype(0)
else:
logging.warning(f" parameter {key} not found, relpace by zero")
return dtype(0)
def nicos_single_request(device):
sentinel = b'\n'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1.0)
s.connect(('amor', 14869))
tosend = f'@nicos/{device}/value?\n'
# write request
# self.log.debug("get_explicit: sending %r", tosend)
s.sendall(tosend.encode())
# read response
data = b''
while not data.endswith(sentinel):
newdata = s.recv(8192) # blocking read
if not newdata:
raise IOError('cache closed connection')
data += newdata
s.shutdown(socket.SHUT_RDWR)
return data.decode('utf-8').split('=')[-1]

View File

@@ -471,6 +471,15 @@ class ReflectivityReductionConfig(ArgParsable):
},
)
logfilter: List[str] = field(
default_factory=lambda: [],
metadata={
'short': 'lf',
'group': 'region of interest',
'help': 'filter using comparison to a log values, multpiple allowd (example "sample_temperature<150")',
},
)
class OutputFomatOption(StrEnum):
Rqz_ort = "Rqz.ort"

View File

@@ -268,7 +268,7 @@ class E2HReduction:
return
try:
# check that events exist in the new file
AmorEventData(new_files[-1], 0, max_events=1000)
AmorEventData(new_files[-1], 0, max_events=1_000)
except Exception:
logging.debug("Problem when trying to load new dataset", exc_info=True)
return

View File

@@ -68,7 +68,10 @@ class ReflectivityReduction:
self.dataevent_actions |= eh.TofTimeCorrection(self.config.experiment.incidentAngle==IncidentAngle.alphaF)
self.dataevent_actions |= ea.CalculateWavelength(self.config.experiment.lambdaRange)
self.dataevent_actions |= ea.CalculateQ(self.config.experiment.incidentAngle)
self.dataevent_actions |= ea.FilterQzRange(self.config.reduction.qzRange)
if not self.config.reduction.is_default('qzRange'):
self.dataevent_actions |= ea.FilterQzRange(self.config.reduction.qzRange)
for lf in self.config.reduction.logfilter:
self.dataevent_actions |= ea.FilterByLog(lf)
self.dataevent_actions |= eh.ApplyMask()
self.grid = LZGrid(self.config.reduction.qResolution, self.config.reduction.qzRange)

View File

@@ -0,0 +1,498 @@
import os
import numpy as np
from unittest import TestCase
from datetime import datetime
from copy import deepcopy
from orsopy.fileio import Person, Experiment, Sample, InstrumentSettings, Value, ValueRange, Polarization
from eos import const
from eos.header import Header
from eos.event_data_types import EVENT_BITMASKS, AmorGeometry, AmorTiming, AmorEventStream, \
EventDataAction, EventDatasetProtocol, PACKET_TYPE, PC_TYPE, PULSE_TYPE, EVENT_TYPE, append_fields
from eos.event_handling import ApplyPhaseOffset, ApplyParameterOverwrites, CorrectChopperPhase, CorrectSeriesTime, \
AssociatePulseWithMonitor, FilterMonitorThreshold, FilterStrangeTimes, TofTimeCorrection, ApplyMask
from eos.event_analysis import ExtractWalltime, MergeFrames, AnalyzePixelIDs, CalculateWavelength, CalculateQ, \
FilterQzRange
from eos.options import MonitorType, IncidentAngle, ExperimentConfig
class MockEventData:
"""
Simulated dataset to be used with event handling unit tests
"""
geometry: AmorGeometry
timing: AmorTiming
data: AmorEventStream
def __init__(self):
self.geometry = AmorGeometry(mu=2.0, nu=1.0, kap=0.5, kad=0.0, div=1.5,
chopperSeparation=1000.0, detectorDistance=4000., chopperDetectorDistance=18842.)
self.timing = AmorTiming(
ch1TriggerPhase=-9.1, ch2TriggerPhase=6.75,
chopperPhase=0.17, chopperSpeed=500., tau=0.06
)
self.create_data()
def create_data(self):
# list of events, here with random time of fligh and pixel location
events = np.recarray((10000, ), dtype=EVENT_TYPE)
events.tof = np.random.uniform(low=0., high=0.12, size=events.shape)
events.pixelID = np.random.randint(0, 28671, size=events.shape)
events.mask = 0
# list of data packates containing previous events
packets = np.recarray((1000,), dtype=PACKET_TYPE)
packets.start_index = np.linspace(0, events.shape[0]-1, packets.shape[0], dtype=np.uint32)
packets.time = np.linspace(1700000000000000000, 1700000000000000000+3_600_000,
packets.shape[0], dtype=np.int64)
# chopper pulses within the measurement time
pulses = np.recarray((packets.shape[0],), dtype=PULSE_TYPE)
pulses.monitor = 1.0
pulses.time = packets.time
# proton current information with independent timing
proton_current = np.recarray((50,), dtype=PC_TYPE)
proton_current.current = 1500.0
proton_current[np.random.randint(0, proton_current.shape[0]-1, 10)] = 0. # random time with no current
proton_current.time = np.linspace(1700000000000000300, 1700000000000000000+3_600_000,
proton_current.shape[0], dtype=np.int64)
self.data = AmorEventStream(events, packets, pulses, proton_current)
self.orig_data = deepcopy(self.data)
def append(self, other):
raise NotImplementedError("Just for testing, no append")
def update_header(self, header:Header):
# update a header with the information read from file
header.owner = Person(name="test user", affiliation='PSI')
header.experiment = Experiment(title='test experiment', instrument='amor',
start_date=datetime.now(), probe="neutron")
header.sample = Sample(name='test sample')
header.measurement_instrument_settings = InstrumentSettings(incident_angle=Value(1.5, 'deg'),
wavelength = ValueRange(3.0, 12.5, 'angstrom'),
polarization = Polarization.unpolarized)
class TestActionClass(TestCase):
@classmethod
def setUpClass(cls):
"""
Create test classes to be used
"""
class T1(EventDataAction):
def perform_action(self, event: EventDatasetProtocol):
event.data.events.mask += 1
class T2(EventDataAction):
def perform_action(self, event: EventDatasetProtocol):
event.data.events.mask += 2
class T4(EventDataAction):
def perform_action(self, event: EventDatasetProtocol):
event.data.events.mask += 4
cls.T1=T1; cls.T2=T2; cls.T4=T4
class H1(EventDataAction):
def perform_action(self, event: EventDatasetProtocol):
...
def update_header(self, header:Header) ->None:
header.sample.name = 'h1'
class H2(EventDataAction):
def perform_action(self, event: EventDatasetProtocol):
...
def update_header(self, header: Header) -> None:
header.sample.name = 'h2'
class HN(EventDataAction):
def __init__(self, n):
self._n = n
def perform_action(self, event: EventDatasetProtocol):
...
def update_header(self, header: Header) -> None:
header.sample.name = self._n
cls.H1=H1; cls.H2=H2; cls.HN = HN
def setUp(self):
self.d = MockEventData()
self.header = Header()
self.d.update_header(self.header)
def test_individual(self):
t1 = self.T1()
t2 = self.T2()
t4 = self.T4()
np.testing.assert_array_equal(self.d.data.events.mask, 0)
t1.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 1)
t2.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 3)
t4.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 7)
def test_header(self):
h1 = self.H1()
h2 = self.H2()
h3 = self.HN('h3')
h4 = self.HN('h4')
h1.update_header(self.header)
self.assertEqual(self.header.sample.name, 'h1')
h2.update_header(self.header)
self.assertEqual(self.header.sample.name, 'h2')
h3.update_header(self.header)
self.assertEqual(self.header.sample.name, 'h3')
h4.update_header(self.header)
self.assertEqual(self.header.sample.name, 'h4')
def test_combination(self):
t1 = self.T1()
t2 = self.T2()
t4 = self.T4()
t12 = t1 | t2
t24 = t2 | t4
t1224 = t1 | t2 | t2 | t4
t1224b = t12 | t24
np.testing.assert_array_equal(self.d.data.events.mask, 0)
t12.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 3)
t24.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 9)
t1224.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 18)
t1224b.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, 27)
def test_combine_header(self):
h1 = self.H1()
h2 = self.H2()
h3 = self.HN('h3')
h4 = self.HN('h4')
(h1|h2).update_header(self.header)
self.assertEqual(self.header.sample.name, 'h2')
(h2|h1).update_header(self.header)
self.assertEqual(self.header.sample.name, 'h1')
(h3|h4).update_header(self.header)
self.assertEqual(self.header.sample.name, 'h4')
(h4|h3).update_header(self.header)
self.assertEqual(self.header.sample.name, 'h3')
def test_abstract_misssing(self):
with self.assertRaises(TypeError):
class E(EventDataAction):
...
_ = E()
def test_hash(self):
"""
Check that hashes of different actions are different but
instances of same action have same hash
"""
t1 = self.T1()
t1b = self.T1()
t2 = self.T2()
t4 = self.T4()
h3 = self.HN('h3')
h3b = self.HN('h3')
h4 = self.HN('h4')
self.assertNotEqual(t1.action_hash(), t2.action_hash())
self.assertNotEqual(t2.action_hash(), t4.action_hash())
self.assertNotEqual(t1.action_hash(), t4.action_hash())
self.assertNotEqual(h3.action_hash(), h4.action_hash())
self.assertEqual(t1.action_hash(), t1b.action_hash())
self.assertEqual(h3.action_hash(), h3b.action_hash())
class TestSimpleActions(TestCase):
def setUp(self):
self.d = MockEventData()
self.header = Header()
self.d.update_header(self.header)
def test_chopper_phase(self):
cp = CorrectChopperPhase()
cp.perform_action(self.d)
np.testing.assert_array_equal(
self.d.data.events.tof,
self.d.orig_data.events.tof+
self.d.timing.tau*(self.d.timing.ch1TriggerPhase-self.d.timing.chopperPhase/2)/180
)
def _extract_walltime(self):
# Extract wall time for events and orig copy
wt = ExtractWalltime()
d = self.d.data
self.d.data = self.d.orig_data
wt.perform_action(self.d)
self.d.data = d
wt.perform_action(self.d)
def test_extract_walltime(self):
self._extract_walltime()
# wallTime should be always a time present in the packet times
np.testing.assert_array_equal(np.isin(self.d.data.events.wallTime, self.d.data.packets.time), True)
# make sure extraction works on both original and copy
np.testing.assert_array_equal(self.d.data.events.wallTime, self.d.orig_data.events.wallTime)
def test_series_time(self):
corr = 100
ct = CorrectSeriesTime(corr)
with self.assertRaises(ValueError):
ct.perform_action(self.d)
self._extract_walltime()
ct.perform_action(self.d)
np.testing.assert_array_equal(
self.d.data.pulses.time,
self.d.orig_data.pulses.time-corr
)
np.testing.assert_array_equal(
self.d.data.events.wallTime,
self.d.orig_data.events.wallTime-corr
)
np.testing.assert_array_equal(
self.d.data.proton_current.time,
self.d.orig_data.proton_current.time-corr
)
def test_associate_monitor(self):
amPC = AssociatePulseWithMonitor(MonitorType.proton_charge)
amT = AssociatePulseWithMonitor(MonitorType.time)
amN = AssociatePulseWithMonitor(MonitorType.neutron_monitor)
self.d.data.pulses.monitor = 13
amN.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.pulses.monitor, 1)
self.d.data.pulses.monitor = 13
amT.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.pulses.monitor, np.float32(2*self.d.timing.tau))
self.d.data.pulses.monitor = 13
amPC.perform_action(self.d)
pcm = self.d.data.proton_current.current *2*self.d.timing.tau*1e-3
np.testing.assert_array_equal(np.isin(self.d.data.pulses.monitor, pcm), True)
def test_filter_monitor_threashold(self):
amPC = AssociatePulseWithMonitor(MonitorType.proton_charge)
fmt = amPC | FilterMonitorThreshold(1000.)
fma = amPC | FilterMonitorThreshold(2000.)
fm0 = amPC | FilterMonitorThreshold(-1.0)
with self.assertRaises(ValueError):
fmt.perform_action(self.d)
self._extract_walltime()
fm0.perform_action(self.d)
self.assertEqual(self.d.data.events.mask.sum(), 0)
fmt.perform_action(self.d)
# calculate, which events should have 0 monitor
zero_times = self.d.data.pulses.time[self.d.data.pulses.monitor==0]
zero_sum = np.isin(self.d.data.events.wallTime, zero_times).sum()
self.assertEqual(self.d.data.events.mask.sum(), zero_sum*EVENT_BITMASKS['MonitorThreshold'])
# filter all events
self.d.data.events.mask = 0
fma.perform_action(self.d)
self.assertEqual(self.d.data.events.mask.sum(), self.d.data.events.shape[0]*EVENT_BITMASKS['MonitorThreshold'])
def test_filter_strage_times(self):
st = FilterStrangeTimes()
st.perform_action(self.d)
self.assertEqual(self.d.data.events.mask.sum(), 0)
# half events should be strange times (outside of ToF frame)
self.d.data.events.tof += self.d.timing.tau
st.perform_action(self.d)
self.assertEqual(self.d.data.events.mask.sum(),
(self.d.data.events.tof>2*self.d.timing.tau).sum()*EVENT_BITMASKS['StrangeTimes'])
def test_apply_phase_offset(self):
action = ApplyPhaseOffset(12.5)
action.perform_action(self.d)
self.assertEqual(self.d.timing.ch1TriggerPhase, 12.5)
def test_apply_parameter_overwrites(self):
action = ApplyParameterOverwrites(ExperimentConfig(muOffset=0.25, mu=3.5, nu=4.5))
action.perform_action(self.d)
self.assertEqual(self.d.geometry.mu, 3.5)
self.assertEqual(self.d.geometry.nu, 4.5)
action = ApplyParameterOverwrites(ExperimentConfig(muOffset=0.25))
action.perform_action(self.d)
self.assertEqual(self.d.geometry.mu, 3.75)
action = ApplyParameterOverwrites(ExperimentConfig(sampleModel='air | Si | Fe'))
action.update_header(self.header)
self.assertIsNotNone(self.header.sample.model)
def test_apply_sample_model_file(self):
if os.path.isfile('test.yaml'):
os.remove('test.yaml')
action = ApplyParameterOverwrites(ExperimentConfig(sampleModel='test.yaml'))
action.update_header(self.header)
self.assertIsNone(self.header.sample.model)
with open('test.yaml', 'w') as fh:
fh.write("""stack: air | Si | Fe""")
try:
action = ApplyParameterOverwrites(ExperimentConfig(sampleModel='test.yaml'))
action.update_header(self.header)
self.assertEqual(self.header.sample.model.stack, 'air | Si | Fe')
finally:
os.remove('test.yaml')
def test_tof_time_correction(self):
action = TofTimeCorrection()
with self.assertRaises(ValueError):
action.perform_action(self.d)
new_events = append_fields(self.d.data.events, [('delta', np.float64)])
new_events.delta = 10.0
self.d.data.events = new_events
tof_before = self.d.data.events.tof.copy()
action.perform_action(self.d)
np.testing.assert_allclose(
self.d.data.events.tof,
tof_before - (10.0 / 180.0) * self.d.timing.tau
)
self.d.create_data()
new_events = append_fields(self.d.data.events, [('delta', np.float64)])
new_events.delta = 10.0
self.d.data.events = new_events
tof_before = self.d.data.events.tof.copy()
action = TofTimeCorrection(correct_chopper_opening=False)
action.perform_action(self.d)
np.testing.assert_allclose(
self.d.data.events.tof,
tof_before - (self.d.geometry.kad / 180.0) * self.d.timing.tau
)
def test_apply_mask(self):
self.d.data.events = self.d.data.events[:6].copy()
self.d.data.events.mask[:] = [0, 1, 2, 3, 4, 5]
action = ApplyMask()
action.perform_action(self.d)
self.assertEqual(self.d.data.events.shape[0], 1)
self.assertEqual(self.d.data.events.mask[0], 0)
self.d.create_data()
self.d.data.events = self.d.data.events[:6].copy()
self.d.data.events.mask[:] = [0, 1, 2, 3, 4, 5]
action = ApplyMask(bitmask_filter=EVENT_BITMASKS['MonitorThreshold'])
action.perform_action(self.d)
np.testing.assert_array_equal(self.d.data.events.mask, np.array([0, EVENT_BITMASKS['MonitorThreshold']],
dtype=np.int32))
def test_merge_frames(self):
action = MergeFrames(lamdaCut=0.0)
action.perform_action(self.d)
self.assertEqual(self.d.data.events.tof.shape, self.d.orig_data.events.tof.shape)
np.testing.assert_array_compare(lambda x,y: x<=y, self.d.data.events.tof, self.d.orig_data.events.tof)
self.assertTrue((-self.d.timing.tau<=self.d.data.events.tof).all())
np.testing.assert_array_less(self.d.data.events.tof, self.d.timing.tau)
action = MergeFrames(lamdaCut=2.0)
self.d.data.events.tof = self.d.orig_data.events.tof[:]
action.perform_action(self.d)
tofCut = 2.0*self.d.geometry.chopperDetectorDistance/const.hdm*1e-13
self.assertTrue((tofCut-self.d.timing.tau<=self.d.data.events.tof).all())
self.assertTrue((self.d.data.events.tof<=tofCut+self.d.timing.tau).all())
def test_analyze_pixel_ids(self):
action = AnalyzePixelIDs((1000, 1001))
action.perform_action(self.d)
self.assertIn('detZ', self.d.data.events.dtype.names)
self.assertIn('detXdist', self.d.data.events.dtype.names)
self.assertIn('delta', self.d.data.events.dtype.names)
self.assertEqual(
np.bitwise_and(self.d.data.events.mask, EVENT_BITMASKS['yRange']).astype(bool).sum(),
self.d.data.events.shape[0]
)
# TODO: maybe add a test actually checking correct detector-id resolution
def test_calculate_wavelength(self):
action = CalculateWavelength((3.0, 5.0))
with self.assertRaises(ValueError):
action.perform_action(self.d)
new_events = append_fields(self.d.data.events, [('detXdist', np.float64)])
new_events.detXdist = 0.0
self.d.data.events = new_events
action.perform_action(self.d)
self.assertIn('lamda', self.d.data.events.dtype.names)
flt = self.d.data.events.mask!=EVENT_BITMASKS['LamdaRange']
# check all wavelength in range not filtered
np.testing.assert_array_less(self.d.data.events.lamda[flt], 5.0)
np.testing.assert_array_less(3.0, self.d.data.events.lamda[flt])
# check all wavelength out of range filtered
flt = self.d.data.events.mask==EVENT_BITMASKS['LamdaRange']
self.assertTrue(((self.d.data.events.lamda[flt]<3.0)|(self.d.data.events.lamda[flt]>5.0)).all())
def test_calculate_q(self):
action = CalculateQ(IncidentAngle.alphaF)
with self.assertRaises(ValueError):
action.perform_action(self.d)
# TODO: add checks for actual resulting values
new_events = append_fields(self.d.data.events, [('lamda', np.float64), ('delta', np.float64)])
new_events.lamda = 5.0
new_events.delta = 0.0
self.d.data.events = new_events
action.perform_action(self.d)
self.assertIn('qz', self.d.data.events.dtype.names)
self.assertNotIn('qx', self.d.data.events.dtype.names)
action.update_header(self.header)
self.assertEqual(self.header.measurement_scheme, 'angle- and energy-dispersive')
self.d.create_data()
new_events = append_fields(self.d.data.events, [('lamda', np.float64), ('delta', np.float64)])
new_events.lamda = 5.0
new_events.delta = 0.0
self.d.data.events = new_events
action = CalculateQ(IncidentAngle.mu)
action.perform_action(self.d)
self.assertIn('qz', self.d.data.events.dtype.names)
self.assertIn('qx', self.d.data.events.dtype.names)
action.update_header(self.header)
self.assertEqual(self.header.measurement_scheme, 'energy-dispersive')
self.d.create_data()
new_events = append_fields(self.d.data.events, [('lamda', np.float64), ('delta', np.float64)])
new_events.lamda = 5.0
new_events.delta = 0.0
self.d.data.events = new_events
action = CalculateQ(IncidentAngle.nu)
action.perform_action(self.d)
self.assertIn('qz', self.d.data.events.dtype.names)
self.assertNotIn('qx', self.d.data.events.dtype.names)
action.update_header(self.header)
self.assertEqual(self.header.measurement_scheme, 'energy-dispersive')
def test_filter_qz_range(self):
action = FilterQzRange((0.1, 0.2))
with self.assertRaises(ValueError):
action.perform_action(self.d)
self.d.data.events = self.d.data.events[:5].copy()
new_events = append_fields(self.d.data.events, [('qz', np.float64)])
new_events.qz = np.array([0.05, 0.1, 0.15, 0.2, 0.25])
self.d.data.events = new_events
action.perform_action(self.d)
np.testing.assert_array_equal(
self.d.data.events.mask,
np.array([1, 0, 0, 0, 1], dtype=np.int32) * EVENT_BITMASKS['qRange']
)

View File

@@ -61,7 +61,7 @@ class FullAmorTest(TestCase):
reduction_config = options.ReflectivityReductionConfig(
normalisationMethod=self._field_defaults['ReflectivityReductionConfig']['normalisationMethod'],
qResolution=0.01,
qzRange=self._field_defaults['ReflectivityReductionConfig']['qzRange'],
qzRange=(0.01, 0.15),
thetaRange=(-0.75, 0.75),
fileIdentifier=["6003-6005"],
scale=[1],