Move event data type specification in separate method and decouple event handling from file reader by using a Protocol

This commit is contained in:
2025-10-01 10:33:03 +02:00
parent 7cf7ff8e61
commit c540fa16fe
4 changed files with 98 additions and 82 deletions

View File

@@ -1,5 +1,5 @@
"""
Define a event dataformat that performs reduction actions like wavelength calculation on per-event basis.
Define an event dataformat that performs reduction actions like wavelength calculation on per-event basis.
"""
import numpy as np
import logging
@@ -7,30 +7,18 @@ import logging
from typing import Tuple
from . import const
from .file_reader import AmorEventData, EVENT_TYPE
from .event_handling import EventDataAction
from .event_data_types import EventDataAction, EventDatasetProtocol, EVENT_TYPE, ANA_EVENT_TYPE, FINAL_EVENT_TYPE
from .helpers import filter_project_x
from .instrument import Detector
from .options import IncidentAngle
from .header import Header
# Structured datatypes used for event streams
ANA_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool)])
FINAL_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool),
('lamda', np.float64), ('qz', np.float64), ('qx', np.float64), ])
class AnalyzedEventData(EventDataAction):
def __init__(self, yRange: Tuple[int, int]):
self.yRange = yRange
def perform_action(self, dataset: AmorEventData) ->None:
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != EVENT_TYPE:
raise ValueError("AnalyzeEventData only works on raw AmorEventData, this dataset has already been altered")
@@ -70,7 +58,7 @@ class TofTimeCorrection(EventDataAction):
def __init__(self, correct_chopper_opening: bool = True):
self.correct_chopper_opening = correct_chopper_opening
def perform_action(self, dataset: AmorEventData) ->None:
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != ANA_EVENT_TYPE:
raise ValueError("TofTimeCorrection requires dataset with analyzed events, perform AnalyzedEventData first")
@@ -85,7 +73,7 @@ class WavelengthAndQ(EventDataAction):
self.lambdaRange = lambdaRange
self.incidentAngle = incidentAngle
def perform_action(self, dataset: AmorEventData) ->None:
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != ANA_EVENT_TYPE:
raise ValueError("WavelengthAndQ requires dataset with analyzed events, perform AnalyzedEventData first")
@@ -129,7 +117,7 @@ class FilterQzRange(EventDataAction):
def __init__(self, qzRange: Tuple[float, float]):
self.qzRange = qzRange
def perform_action(self, dataset: AmorEventData) ->None:
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if d.events.dtype != FINAL_EVENT_TYPE:
raise ValueError("FilterQzRange requires dataset with fully analyzed events, perform WavelengthAndQ first")
@@ -138,7 +126,7 @@ class FilterQzRange(EventDataAction):
d.events.mask &= (self.qzRange[0]<=d.events.qz) & (d.events.qz<=self.qzRange[1])
class ApplyMask(EventDataAction):
def perform_action(self, dataset: AmorEventData) ->None:
def perform_action(self, dataset: EventDatasetProtocol) ->None:
d = dataset.data
if not 'mask' in d.events.dtype.names:
logging.debug("ApplyMask performed on dataset without mask")

View File

@@ -0,0 +1,79 @@
"""
Specify the data type and protocol used for event datasets.
"""
from typing import Optional, Protocol
from dataclasses import dataclass
from .header import Header
from abc import ABC, abstractmethod
import numpy as np
import logging
@dataclass
class AmorGeometry:
mu:float
nu:float
kap:float
kad:float
div:float
chopperSeparation: float
detectorDistance: float
chopperDetectorDistance: float
delta_z: Optional[float] = None
@dataclass
class AmorTiming:
ch1TriggerPhase: float
ch2TriggerPhase: float
chopperSpeed: float
chopperPhase: float
tau: float
# Structured datatypes used for event streams
EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64)])
PACKET_TYPE = np.dtype([('start_index', np.uint32), ('Time', np.int64)])
PULSE_TYPE = np.dtype([('time', np.int64), ('monitor', np.float32)])
PC_TYPE = np.dtype([('current', np.float32), ('time', np.int64)])
# analyzed event sreams with extra attributes
ANA_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool)])
FINAL_EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64),
('detZ', np.float64), ('detXdist', np.float64), ('delta', np.float64),
('mask', bool),
('lamda', np.float64), ('qz', np.float64), ('qx', np.float64), ])
@dataclass
class AmorEventStream:
events: np.recarray # EVENT_TYPE
packets: np.recarray # PACKET_TYPE
pulses: Optional[np.recarray] = None # PULSE_TYPE
proton_current: Optional[np.recarray] = None # PC_TYPE
class EventDatasetProtocol(Protocol):
"""
Minimal attributes a dataset needs to provide to work with EventDataAction
"""
geometry: AmorGeometry
timing: AmorTiming
data: AmorEventStream
class EventDataAction(ABC):
"""
Abstract base class used for actions applied to an EventDatasetProtocol based objects.
Each action can optionally modify the header information.
"""
def __call__(self, dataset: EventDatasetProtocol)->None:
logging.debug(f" Enter action {self.__class__.__name__} on {dataset!r}")
self.perform_action(dataset)
@abstractmethod
def perform_action(self, dataset: EventDatasetProtocol)->None: ...
def update_header(self, header:Header)->None:
if hasattr(self, 'action_name'):
header.reduction.corrections.append(getattr(self, 'action_name'))

View File

@@ -4,35 +4,16 @@ Calculations performed on AmorEventData.
import logging
import numpy as np
from abc import ABC, abstractmethod
from .header import Header
from .options import MonitorType
from .file_reader import AmorEventData
from .event_data_types import EventDatasetProtocol, EventDataAction
from .helpers import merge_frames
class EventDataAction(ABC):
"""
Abstract base class used for actions applied to an AmorEventData object.
Each action can optionally modify the header information.
"""
def __call__(self, dataset: AmorEventData)->None:
logging.debug(f" Enter action {self.__class__.__name__} on {dataset!r}")
self.perform_action(dataset)
@abstractmethod
def perform_action(self, dataset: AmorEventData)->None: ...
def update_header(self, header:Header)->None:
if hasattr(self, 'action_name'):
header.reduction.corrections.append(getattr(self, 'action_name'))
class CorrectSeriesTime(EventDataAction):
def __init__(self, seriesStartTime):
self.seriesStartTime = np.int64(seriesStartTime)
def perform_action(self, dataset: AmorEventData)->None:
def perform_action(self, dataset: EventDatasetProtocol)->None:
dataset.data.pulses.time -= self.seriesStartTime
dataset.data.events.wallTime -= self.seriesStartTime
dataset.data.proton_current.time -= self.seriesStartTime
@@ -45,7 +26,7 @@ class AssociatePulseWithMonitor(EventDataAction):
self.monitorType = monitorType
self.lowCurrentThreshold = lowCurrentThreshold
def perform_action(self, dataset: AmorEventData)->None:
def perform_action(self, dataset: EventDatasetProtocol)->None:
logging.debug(f' using monitor type {self.monitorType}')
if self.monitorType in [MonitorType.proton_charge or MonitorType.debug]:
monitorPerPulse = self.get_current_per_pulse(dataset.data.pulses.time,
@@ -93,7 +74,7 @@ class AssociatePulseWithMonitor(EventDataAction):
class FilterStrangeTimes(EventDataAction):
def perform_action(self, dataset: AmorEventData)->None:
def perform_action(self, dataset: EventDatasetProtocol)->None:
filter_e = (dataset.data.events.tof<=2*dataset.timing.tau)
dataset.data.events = dataset.data.events[filter_e]
if not filter_e.all():
@@ -103,7 +84,7 @@ class MergeFrames(EventDataAction):
def __init__(self, tofCut:float):
self.tofCut = tofCut
def perform_action(self, dataset: AmorEventData)->None:
def perform_action(self, dataset: EventDatasetProtocol)->None:
total_offset = (self.tofCut +
dataset.timing.tau * (dataset.timing.ch1TriggerPhase + dataset.timing.chopperPhase/2)/180)
dataset.data.events.tof = merge_frames(dataset.data.events.tof, self.tofCut, dataset.timing.tau, total_offset)

View File

@@ -1,3 +1,7 @@
"""
Reading of Amor NeXus data files to extract metadata and event stream.
"""
import h5py
import numpy as np
import platform
@@ -5,8 +9,6 @@ import logging
import subprocess
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from orsopy import fileio
from orsopy.fileio.model_language import SampleModel
@@ -14,6 +16,7 @@ from orsopy.fileio.model_language import SampleModel
from . import const
from .header import Header
from .helpers import extract_walltime
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
try:
import zoneinfo
@@ -31,45 +34,12 @@ if platform.node().startswith('amor'):
else:
NICOS_CACHE_DIR = None
@dataclass
class AmorGeometry:
mu:float
nu:float
kap:float
kad:float
div:float
chopperSeparation: float
detectorDistance: float
chopperDetectorDistance: float
delta_z: Optional[float] = None
@dataclass
class AmorTiming:
ch1TriggerPhase: float
ch2TriggerPhase: float
chopperSpeed: float
chopperPhase: float
tau: float
# Structured datatypes used for event streams
EVENT_TYPE = np.dtype([('tof', np.float64),('pixelID', np.uint32), ('wallTime', np.int64)])
PACKET_TYPE = np.dtype([('start_index', np.uint32), ('Time', np.int64)])
PULSE_TYPE = np.dtype([('time', np.int64), ('monitor', np.float32)])
PC_TYPE = np.dtype([('current', np.float32), ('time', np.int64)])
@dataclass
class AmorEventStream:
events: np.recarray # EVENT_TYPE
packets: np.recarray # PACKET_TYPE
pulses: Optional[np.recarray] = None # PULSE_TYPE
proton_current: Optional[np.recarray] = None # PC_TYPE
class AmorEventData:
"""
Read one amor NeXus datafile and extract relevant header information.
Implements EventDatasetProtocol
"""
fileName: str
first_index: int
@@ -85,8 +55,6 @@ class AmorEventData:
data: AmorEventStream
startTime: np.int64
# attributes that will only be assigned by specific actions
monitorPerPulse:np.ndarray
def __init__(self, fileName, first_index=0, max_events=1_000_000):
self.fileName = fileName