Files
eos/eos/helpers.py
Artur Glavic 5e96a20f23
All checks were successful
Unit Testing / test (3.10) (push) Successful in 54s
Unit Testing / test (3.11) (push) Successful in 49s
Unit Testing / test (3.8) (push) Successful in 50s
Unit Testing / test (3.12) (push) Successful in 55s
Unit Testing / test (3.9) (push) Successful in 54s
Evaluating ToF to extract sub-frames and assign to correct neutron pulse not chopper pulse (#4)
Interpolate neutorn times between chopper pulses and assign by ToF. If ToF too short (long wavelengths from previous pulse), assign them to the previous sub-pulse.

Reviewed with Jochen

Reviewed-on: #4
Co-authored-by: Artur Glavic <artur.glavic@psi.ch>
Co-committed-by: Artur Glavic <artur.glavic@psi.ch>
2026-03-10 15:00:10 +01:00

40 lines
1.5 KiB
Python

"""
Helper functions used during calculations. Uses numba enhanced functions if available, otherwise numpy based
fallback is imported.
"""
import numpy as np
from .event_data_types import EventDatasetProtocol, append_fields
try:
from .helpers_numba import merge_frames, extract_walltime, filter_project_x, \
calculate_derived_properties_focussing, merge_frames_w_index
except ImportError:
import logging
logging.warning('Cannot import numba enhanced functions, is it installed?')
from .helpers_fallback import merge_frames, extract_walltime, filter_project_x, \
calculate_derived_properties_focussing, merge_frames_w_index
def add_log_to_pulses(key, dataset: EventDatasetProtocol):
"""
Add a log value for each pulse to the pulses array.
"""
pulses = dataset.data.pulses
log_data = dataset.data.device_logs[key]
if log_data.time[0]>0:
logTimeS = np.hstack([[0], log_data.time, [pulses.time[-1]+1]])
logValues = np.hstack([[log_data.value[0]], log_data.value])
else:
logTimeS = np.hstack([log_data.time, [pulses.time[-1]+1]])
logValues = log_data.value
pulseLogS = np.zeros(pulses.time.shape[0], dtype=log_data.value.dtype)
j = 0
for i, ti in enumerate(pulses.time):
# find the last current item that was before this pulse
while ti>=logTimeS[j+1]:
j += 1
pulseLogS[i] = logValues[j]
pulses = append_fields(pulses, [(key, pulseLogS.dtype)])
pulses[key] = pulseLogS
dataset.data.pulses = pulses