Dynamically extend event recarray and only filter events by setting mask values. Masks are now bitmasks to track which filter applied where.

This commit is contained in:
2025-10-02 16:41:09 +02:00
parent 5ecdecfe24
commit 2a89e58698
4 changed files with 105 additions and 59 deletions

View File

@@ -5,9 +5,10 @@ import numpy as np
import logging
from typing import Tuple
from numpy.lib.recfunctions import rec_append_fields
from . import const
from .event_data_types import EventDataAction, EventDatasetProtocol, EVENT_TYPE, ANA_EVENT_TYPE, FINAL_EVENT_TYPE
from .event_data_types import EventDataAction, EventDatasetProtocol, append_fields, EVENT_BITMASKS
from .helpers import filter_project_x
from .instrument import Detector
from .options import IncidentAngle
@@ -20,22 +21,18 @@ class AnalyzePixelIDs(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != EVENT_TYPE:
raise ValueError("AnalyzeEventData only works on raw AmorEventData, this dataset has already been altered")
pixelLookUp = self.resolve_pixels()
# TODO: change numba implementation to use native pixelID type
(detZ, detXdist, delta, mask) = filter_project_x(
pixelLookUp, d.events.pixelID.astype(np.int64), self.yRange[0], self.yRange[1]
)
ana_events = np.recarray(d.events.shape, dtype=ANA_EVENT_TYPE)
# copy old data
for field in d.events.dtype.fields.keys():
ana_events[field] = d.events[field]
ana_events = append_fields(d.events, [
('detZ', detZ.dtype), ('detXdist', detXdist.dtype), ('delta', delta.dtype)])
# add analysis per event
ana_events.detZ = detZ
ana_events.detXdist = detXdist
ana_events.delta = delta
ana_events.mask = mask
ana_events.mask += np.logical_not(mask)*EVENT_BITMASKS['yRange']
d.events = ana_events
dataset.geometry.delta_z = self.delta_z
@@ -60,36 +57,44 @@ class TofTimeCorrection(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != ANA_EVENT_TYPE:
raise ValueError("TofTimeCorrection requires dataset with analyzed events, perform AnalyzedEventData first")
if self.correct_chopper_opening:
d.events.tof -= ( d.events.delta / 180. ) * dataset.timing.tau
else:
d.events.tof -= ( dataset.geometry.kad / 180. ) * dataset.timing.tau
class WavelengthAndQ(EventDataAction):
def __init__(self, lambdaRange: Tuple[float, float], incidentAngle: IncidentAngle):
class CalculateWavelength(EventDataAction):
def __init__(self, lambdaRange: Tuple[float, float]):
self.lambdaRange = lambdaRange
self.incidentAngle = incidentAngle
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != ANA_EVENT_TYPE:
raise ValueError("WavelengthAndQ requires dataset with analyzed events, perform AnalyzedEventData first")
if not 'detXdist' in dataset.data.events.dtype.names:
raise ValueError("CalculateWavelength requires dataset with analyzed pixels, perform AnalyzePixelIDs first")
self.lamdaMax = const.lamdaCut+1.e13*dataset.timing.tau*const.hdm/(dataset.geometry.chopperDetectorDistance+124.)
# lambda
lamda = (1.e13*const.hdm)*d.events.tof/(dataset.geometry.chopperDetectorDistance+d.events.detXdist)
final_events = np.recarray(d.events.shape, dtype=FINAL_EVENT_TYPE)
# copy old data
for field in d.events.dtype.fields.keys():
final_events[field] = d.events[field]
final_events = append_fields(d.events, [('lamda', np.float64)])
# add analysis per event
final_events.lamda = lamda
final_events.mask &= (self.lambdaRange[0]<=lamda) & (lamda<=self.lambdaRange[1])
final_events.mask += EVENT_BITMASKS["LamdaRange"]*(
(self.lambdaRange[0]>lamda) | (lamda>self.lambdaRange[1]))
d.events = final_events
class CalculateQ(EventDataAction):
def __init__(self, incidentAngle: IncidentAngle):
self.incidentAngle = incidentAngle
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if not 'lamda' in dataset.data.events.dtype.names:
raise ValueError("CalculateQ requires dataset with analyzed wavelength, perform CalculateWavelength first")
lamda = d.events.lamda
final_events = append_fields(d.events, [('qz', np.float64)])
# alpha_f
# q_z
@@ -103,6 +108,7 @@ class WavelengthAndQ(EventDataAction):
alphaF_e = dataset.geometry.nu - dataset.geometry.mu + d.events.delta
alphaI = dataset.geometry.kap + dataset.geometry.kad + dataset.geometry.mu
final_events.qz = 2*np.pi * ((np.sin(np.deg2rad(alphaF_e)) + np.sin(np.deg2rad(alphaI)))/lamda)
final_events = append_fields(final_events, [('qx', np.float64)])
final_events.qx = 2*np.pi * ((np.cos(np.deg2rad(alphaF_e)) - np.cos(np.deg2rad(alphaI)))/lamda)
dataset.data.events = final_events
@@ -119,19 +125,30 @@ class FilterQzRange(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != FINAL_EVENT_TYPE:
raise ValueError("FilterQzRange requires dataset with fully analyzed events, perform WavelengthAndQ first")
if not 'qz' in dataset.data.events.dtype.names:
raise ValueError("FilterQzRange requires dataset with qz values per events, perform WavelengthAndQ first")
if self.qzRange[1]<0.5:
d.events.mask &= (self.qzRange[0]<=d.events.qz) & (d.events.qz<=self.qzRange[1])
d.events.mask += EVENT_BITMASKS["qRange"]*((self.qzRange[0]>d.events.qz) | (d.events.qz>self.qzRange[1]))
class ApplyMask(EventDataAction):
def __init__(self, bitmask_filter=None):
self.bitmask_filter = bitmask_filter
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if not 'mask' in d.events.dtype.names:
logging.debug("ApplyMask performed on dataset without mask")
return
logging.info(f' number of events: total = {d.events.shape[0]:7d}, '
f'filtered = {np.logical_not(d.events.mask).sum():7d}')
d.events = d.events[d.events.mask]
f'filtered = {(d.events.mask!=0).sum():7d}')
if logging.getLogger().level == logging.DEBUG:
# only run this calculation if debug level is actually active
filtered_by_mask = {}
for key, value in EVENT_BITMASKS.items():
filtered_by_mask[key] = ((d.events.mask & value)!=0).sum()
logging.debug(f" Removed by filters: {filtered_by_mask}")
if self.bitmask_filter is None:
d.events = d.events[d.events.mask==0]
else:
# remove the provided bitmask_filter bits from the events
# this means that all bits that are set in bitmask_filter will NOT be used to filter events
fltr = (d.events.mask & (~self.bitmask_filter)) == 0
d.events = d.events[fltr]

View File

@@ -1,7 +1,7 @@
"""
Specify the data type and protocol used for event datasets.
"""
from typing import List, Optional, Protocol
from typing import List, Optional, Protocol, Tuple
from dataclasses import dataclass
from .header import Header
from abc import ABC, abstractmethod
@@ -31,26 +31,36 @@ class AmorTiming:
tau: float
# Structured datatypes used for event streams
EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64)])
EVENT_TYPE = np.dtype([('tof', np.float64), ('pixelID', np.uint32), ('mask', np.int32)])
PACKET_TYPE = np.dtype([('start_index', np.uint32), ('Time', np.int64)])
PULSE_TYPE = np.dtype([('time', np.int64), ('monitor', np.float32)])
PC_TYPE = np.dtype([('current', np.float32), ('time', np.int64)])
# analyzed event sreams with extra attributes
ANA_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool)])
FINAL_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool),
('lamda', np.float64), ('qz', np.float64), ('qx', np.float64), ])
# define the bitmask for individual event filters
EVENT_BITMASKS = {
'MonitorThreshold': 1,
'StrangeTimes': 2,
'yRange': 4,
'LamdaRange': 8,
'qRange': 16,
}
def append_fields(input: np.recarray, new_fields: List[Tuple[str, np.dtype]]):
# add one ore more fields to a recarray, numpy functions seems to fail
flds = [(name, dtypei[0]) for name, dtypei in input.dtype.fields.items()]
flds += new_fields
output = np.recarray(len(input), dtype=flds)
for field in input.dtype.fields.keys():
output[field] = input[field]
return output
@dataclass
class AmorEventStream:
events: np.recarray # EVENT_TYPE
packets: np.recarray # PACKET_TYPE
pulses: Optional[np.recarray] = None # PULSE_TYPE
proton_current: Optional[np.recarray] = None # PC_TYPE
pulses: np.recarray # PULSE_TYPE
proton_current: np.recarray # PC_TYPE
class EventDatasetProtocol(Protocol):
"""

View File

@@ -4,11 +4,12 @@ Calculations performed on AmorEventData.
import logging
import os
import numpy as np
# from numpy.lib.recfunctions import rec_append_fields, append_fields
from . import const
from .header import Header
from .options import ExperimentConfig, MonitorType
from .event_data_types import EventDatasetProtocol, EventDataAction
from .event_data_types import EventDatasetProtocol, EventDataAction, append_fields, EVENT_BITMASKS
from .helpers import merge_frames, extract_walltime
class ApplyParameterOverwrites(EventDataAction):
@@ -58,16 +59,21 @@ class CorrectChopperPhase(EventDataAction):
class ExtractWalltime(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol) ->None:
# TODO: fix numba type definition after refactor
dataset.data.events.wallTime = extract_walltime(dataset.data.events.tof,
dataset.data.packets.start_index.astype(np.uint64),
dataset.data.packets.Time)
wallTime = extract_walltime(dataset.data.events.tof,
dataset.data.packets.start_index.astype(np.uint64),
dataset.data.packets.Time)
logging.debug(f' expending event stream by wallTime')
new_events = append_fields(dataset.data.events, [('wallTime', wallTime.dtype)])
new_events.wallTime = wallTime
dataset.data.events = new_events
class CorrectSeriesTime(EventDataAction):
def __init__(self, seriesStartTime):
self.seriesStartTime = np.int64(seriesStartTime)
def perform_action(self, dataset: EventDatasetProtocol)->None:
if not 'wallTime' in dataset.data.events.dtype.names:
raise ValueError("CorrectTimeSeries requires walltTime to be extracted, please run ExtractWalltime first")
dataset.data.pulses.time -= self.seriesStartTime
dataset.data.events.wallTime -= self.seriesStartTime
dataset.data.proton_current.time -= self.seriesStartTime
@@ -84,6 +90,9 @@ class AssociatePulseWithMonitor(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol)->None:
logging.debug(f' using monitor type {self.monitorType}')
if self.monitorType in [MonitorType.proton_charge or MonitorType.debug]:
if not 'wallTime' in dataset.data.events.dtype.names:
raise ValueError(
"AssociatePulseWithMonitor requires walltTime to be extracted, please run ExtractWalltime first")
monitorPerPulse = self.get_current_per_pulse(dataset.data.pulses.time,
dataset.data.proton_current.time,
dataset.data.proton_current.current)\
@@ -107,12 +116,12 @@ class AssociatePulseWithMonitor(EventDataAction):
def monitor_threshold(self, dataset):
# TODO: evaluate if this should actually do masking instead
goodTimeS = dataset.data.pulses.time[dataset.data.pulses.monitor!=0]
filter_e = np.isin(dataset.data.events.wallTime, goodTimeS)
dataset.data.events = dataset.data.events[filter_e]
filter_e = np.logical_not(np.isin(dataset.data.events.wallTime, goodTimeS))
dataset.data.events.mask += EVENT_BITMASKS['MonitorThreshold']*filter_e
logging.info(f' low-beam (<{self.lowCurrentThreshold} mC) rejected pulses: '
f'{dataset.data.pulses.monitor.shape[0]-goodTimeS.shape[0]} '
f'out of {dataset.data.pulses.monitor.shape[0]}')
logging.info(f' with {filter_e.shape[0]-dataset.data.events.shape[0]} events')
logging.info(f' with {filter_e.sum()} events')
if goodTimeS.shape[0]:
logging.info(f' average counts per pulse = {dataset.data.events.shape[0]/goodTimeS.shape[0]:7.1f}')
else:
@@ -135,10 +144,10 @@ class AssociatePulseWithMonitor(EventDataAction):
class FilterStrangeTimes(EventDataAction):
def perform_action(self, dataset: EventDatasetProtocol)->None:
filter_e = (dataset.data.events.tof<=2*dataset.timing.tau)
dataset.data.events = dataset.data.events[filter_e]
if not filter_e.all():
logging.warning(f' strange times: {np.logical_not(filter_e).sum()}')
filter_e = np.logical_not(dataset.data.events.tof<=2*dataset.timing.tau)
dataset.data.events.mask += EVENT_BITMASKS['StrangeTimes']*filter_e
if filter_e.any():
logging.warning(f' strange times: {filter_e.sum()}')
class MergeFrames(EventDataAction):

View File

@@ -294,7 +294,11 @@ class AmorEventData:
events = np.recarray(nevts, dtype=EVENT_TYPE)
events.tof = np.array(self.hdf['/entry1/Amor/detector/data/event_time_offset'][self.first_index:self.last_index+1])/1.e9
events.pixelID = self.hdf['/entry1/Amor/detector/data/event_id'][self.first_index:self.last_index+1]
self.data = AmorEventStream(events, packets)
events.mask = 0
pulses = self.read_chopper_trigger_stream()
current = self.read_proton_current_stream()
self.data = AmorEventStream(events, packets, pulses, current)
def read_chopper_trigger_stream(self):
chopper1TriggerTime = np.array(self.hdf['entry1/Amor/chopper/ch2_trigger/event_time_zero'][:-2], dtype=np.int64)
@@ -310,11 +314,12 @@ class AmorEventData:
pulseTimeS = np.arange(startTime, stopTime, self.timing.tau*1e9, dtype=np.int64)
pulses = np.recarray(pulseTimeS.shape, dtype=PULSE_TYPE)
pulses.time = pulseTimeS
pulses.monitor = 1. # default is monitor pulses as it requires no calculation
# apply filter in case the events were filtered
if self.first_index>0 or not self.EOF:
pulses = pulses[(pulses.time>=self.data.packets.Time[0])&(pulses.time<=self.data.packets.Time[-1])]
self.data.pulses = pulses
self.eventStartTime = startTime
return pulses
def read_proton_current_stream(self):
proton_current = np.recarray(self.hdf['entry1/Amor/detector/proton_current/time'].shape, dtype=PC_TYPE)
@@ -323,7 +328,7 @@ class AmorEventData:
if self.first_index>0 or not self.EOF:
proton_current = proton_current[(proton_current.time>=self.data.packets.Time[0])&
(proton_current.time<=self.data.packets.Time[-1])]
self.data.proton_current = proton_current
return proton_current
def info(self):
output = ""
@@ -398,6 +403,7 @@ class AmorData:
def prepare_actions(self):
# setup all actions performed in origin AmorData, time correction requires first dataset start time
# The order of these corrections matter as some rely on parameters modified before
self.time_correction = eh.CorrectSeriesTime(0)
self.event_actions = eh.ApplyParameterOverwrites(self.config) # some actions use instrument parameters, change before that
self.event_actions |= eh.CorrectChopperPhase()
@@ -408,7 +414,8 @@ class AmorData:
self.event_actions |= eh.MergeFrames()
self.event_actions |= ea.AnalyzePixelIDs(self.config.yRange)
self.event_actions |= ea.TofTimeCorrection(self.config.incidentAngle==IncidentAngle.alphaF)
self.event_actions |= ea.WavelengthAndQ(self.config.lambdaRange, self.config.incidentAngle)
self.event_actions |= ea.CalculateWavelength(self.config.lambdaRange)
self.event_actions |= ea.CalculateQ(self.config.incidentAngle)
self.event_actions |= ea.FilterQzRange(self.config.qzRange)
self.event_actions |= ea.ApplyMask()
@@ -452,7 +459,10 @@ class AmorData:
self.lamda_e = ev.lamda
self.wallTime_e = ev.wallTime
self.qz_e = ev.qz
self.qx_e = ev.qx
try:
self.qx_e = ev.qx
except AttributeError:
self.qx_e = 0.*self.qz_e
self.pulseTimeS = ds.data.pulses.time
self.monitorPerPulse = ds.data.pulses.monitor