Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c073fae269 | |||
| 447b81d09d | |||
| 003a8c04ce | |||
| 4cd25dec3d | |||
| b38eeb8eb3 | |||
| d722228079 | |||
| 493c2bf802 | |||
| c429429dd2 | |||
| 0550e725e8 | |||
| 5c8b9a8cd6 | |||
| 4e5d085ad7 | |||
| 2d8d1c66c6 | |||
| 14ea89e243 | |||
| 322c00ca54 | |||
| 0b7a56628f | |||
| 77641412ab | |||
| fc1bd66c3d | |||
| 95a1ffade4 | |||
| 4ee1cf7ea7 | |||
| c90bdd3316 |
@@ -2,5 +2,5 @@
|
||||
Package to handle data redction at AMOR instrument to be used by __main__.py script.
|
||||
"""
|
||||
|
||||
__version__ = '3.0.1'
|
||||
__date__ = '2025-10-10'
|
||||
__version__ = '3.0.3'
|
||||
__date__ = '2025-10-28'
|
||||
|
||||
@@ -41,6 +41,7 @@ class AmorHeader:
|
||||
|
||||
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
|
||||
if type(fileName) is str:
|
||||
logging.warning(f' {fileName.split("/")[-1]}')
|
||||
self.hdf = h5py.File(fileName, 'r', swmr=True)
|
||||
elif type(fileName) is h5py.File:
|
||||
self.hdf = fileName
|
||||
@@ -128,7 +129,7 @@ class AmorHeader:
|
||||
mu = self._replace_if_missing('instrument_control_parameters/mu', 'mu', float)
|
||||
nu = self._replace_if_missing('instrument_control_parameters/nu', 'nu', float)
|
||||
kap = self._replace_if_missing('instrument_control_parameters/kappa', 'kappa', float)
|
||||
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kad', float)
|
||||
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kappa_offset', float)
|
||||
div = self._replace_if_missing('instrument_control_parameters/div', 'div', float)
|
||||
ch1TriggerPhase = self._replace_if_missing('chopper/ch1_trigger_phase', 'ch1_trigger_phase', float)
|
||||
ch2TriggerPhase = self._replace_if_missing('chopper/ch2_trigger_phase', 'ch2_trigger_phase', float)
|
||||
@@ -222,6 +223,8 @@ class AmorEventData(AmorHeader):
|
||||
|
||||
def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000):
|
||||
if type(fileName) is str:
|
||||
#logging.warning(f' opening file {fileName}')
|
||||
logging.warning(f' {fileName.split("/")[-1]}')
|
||||
self.file_list = [fileName]
|
||||
hdf = h5py.File(fileName, 'r', swmr=True)
|
||||
elif type(fileName) is h5py.File:
|
||||
|
||||
165
eos/kafka_events.py
Normal file
165
eos/kafka_events.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""
|
||||
Collect AMOR detector events send via Kafka.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import numpy as np
|
||||
from threading import Thread, Event
|
||||
from time import time
|
||||
|
||||
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
|
||||
|
||||
from uuid import uuid4
|
||||
from streaming_data_types.eventdata_ev44 import EventData
|
||||
from streaming_data_types.logdata_f144 import ExtractedLogData
|
||||
from streaming_data_types import deserialise_f144, deserialise_ev44
|
||||
from confluent_kafka import Consumer
|
||||
|
||||
from .header import Header
|
||||
|
||||
|
||||
try:
|
||||
from streaming_data_types.utils import get_schema
|
||||
except ImportError:
|
||||
from streaming_data_types.utils import _get_schema as get_schema
|
||||
|
||||
|
||||
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
|
||||
AMOR_EVENTS = 'amor_ev44'
|
||||
AMOR_NICOS = 'AMOR_nicosForwarder'
|
||||
|
||||
class KafkaFrozenData:
|
||||
"""
|
||||
Represents event stream data from Kafka at a given time.
|
||||
Will be returned by KafkaEventData to be use in conjunction
|
||||
with data processing and projections.
|
||||
|
||||
Implements EventDatasetProtocol
|
||||
"""
|
||||
geometry: AmorGeometry
|
||||
timing: AmorTiming
|
||||
data: AmorEventStream
|
||||
|
||||
def __init__(self, geometry, timing, data, monitor=1.):
|
||||
self.geometry = geometry
|
||||
self.timing = timing
|
||||
self.data = data
|
||||
self.monitor = monitor
|
||||
|
||||
def append(self, other):
|
||||
raise NotImplementedError("can't append live datastream to other event data")
|
||||
|
||||
def update_header(self, header:Header):
|
||||
# maybe makes sense later, but for now just used for live vizualization
|
||||
...
|
||||
|
||||
class KafkaEventData(Thread):
|
||||
"""
|
||||
Read Nicos information and events from Kafka. Creates a background
|
||||
thread that listens to Kafka events and converts them to eos compatible information.
|
||||
"""
|
||||
geometry: AmorGeometry
|
||||
timing: AmorTiming
|
||||
events: np.recarray
|
||||
|
||||
def __init__(self):
|
||||
self.stop_event = Event()
|
||||
self.stop_counting = Event()
|
||||
self.new_events = Event()
|
||||
self.last_read = 0
|
||||
self.last_read_time = 0.
|
||||
self.start_time = time()
|
||||
self.consumer = Consumer(
|
||||
{'bootstrap.servers': 'linkafka01.psi.ch:9092',
|
||||
'group.id': uuid4()})
|
||||
self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS])
|
||||
self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0)
|
||||
self.timing = AmorTiming(0., 0., 500., 0., 30./500.)
|
||||
# create empty dataset
|
||||
self.events = np.recarray(0, dtype=EVENT_TYPE)
|
||||
super().__init__()
|
||||
|
||||
def run(self):
|
||||
while not self.stop_event.is_set():
|
||||
messages = self.consumer.consume(10, timeout=1)
|
||||
for message in messages:
|
||||
self.process_message(message)
|
||||
|
||||
def process_message(self, message):
|
||||
if message.error():
|
||||
logging.info(f" received Kafka message with error: {message.error()}")
|
||||
return
|
||||
schema = get_schema(message.value())
|
||||
if message.topic()==AMOR_EVENTS and schema=='ev44':
|
||||
events:EventData = deserialise_ev44(message.value())
|
||||
self.add_events(events)
|
||||
self.new_events.set()
|
||||
logging.debug(f' new events {events}')
|
||||
elif message.topic()==AMOR_NICOS and schema=='f144':
|
||||
nicos_data:ExtractedLogData = deserialise_f144(message.value())
|
||||
if nicos_data.source_name in self.nicos_mapping.keys():
|
||||
logging.debug(f' {nicos_data.source_name} = {nicos_data.value}')
|
||||
self.update_instrument(nicos_data)
|
||||
|
||||
def add_events(self, events:EventData):
|
||||
"""
|
||||
Add new events to the Dataset. The object keeps raw events
|
||||
and only copies the latest set to the self.data object,
|
||||
this allows to run the event processing to be performed on a "clean"
|
||||
evnet stream each time.
|
||||
"""
|
||||
if self.stop_counting.is_set():
|
||||
return
|
||||
prev_size = self.events.shape[0]
|
||||
new_events = events.pixel_id.shape[0]
|
||||
self.events.resize(prev_size+new_events, refcheck=False)
|
||||
self.events.pixelID[prev_size:] = events.pixel_id
|
||||
self.events.mask[prev_size:] = 0
|
||||
self.events.tof[prev_size:] = events.time_of_flight/1.e9
|
||||
|
||||
nicos_mapping = {
|
||||
'mu': ('geometry', 'mu'),
|
||||
'nu': ('geometry', 'nu'),
|
||||
'kappa': ('geometry', 'kap'),
|
||||
'kappa_offset': ('geometry', 'kad'),
|
||||
'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'),
|
||||
'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'),
|
||||
'ch2_speed': ('timing', 'chopperSpeed'),
|
||||
'chopper_phase': ('timing', 'chopperPhase'),
|
||||
}
|
||||
|
||||
def update_instrument(self, nicos_data:ExtractedLogData):
|
||||
if nicos_data.source_name in self.nicos_mapping:
|
||||
attr, subattr = self.nicos_mapping[nicos_data.source_name]
|
||||
setattr(getattr(self, attr), subattr, nicos_data.value)
|
||||
if nicos_data.source_name=='ch2_speed':
|
||||
self.timing.tau = 30./self.timing.chopperSpeed
|
||||
|
||||
def monitor(self):
|
||||
return time()-self.start_time
|
||||
|
||||
def restart(self):
|
||||
# empty event buffer
|
||||
self.events = np.recarray(0, dtype=EVENT_TYPE)
|
||||
self.stop_counting.clear()
|
||||
self.last_read = 0
|
||||
self.start_time = time()
|
||||
self.new_events.clear()
|
||||
|
||||
def get_events(self, total_counts=False):
|
||||
packets = np.recarray(0, dtype=PACKET_TYPE)
|
||||
pulses = np.recarray(0, dtype=PULSE_TYPE)
|
||||
pc = np.recarray(0, dtype=PC_TYPE)
|
||||
if total_counts:
|
||||
last_read = 0
|
||||
else:
|
||||
last_read = self.last_read
|
||||
if last_read>=self.events.shape[0]:
|
||||
raise EOFError("No new events arrived")
|
||||
data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc)
|
||||
self.last_read = self.events.shape[0]
|
||||
self.new_events.clear()
|
||||
t_now = time()
|
||||
monitor = t_now-self.last_read_time
|
||||
self.last_read_time = t_now
|
||||
return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor)
|
||||
283
eos/kafka_serializer.py
Normal file
283
eos/kafka_serializer.py
Normal file
@@ -0,0 +1,283 @@
|
||||
"""
|
||||
Allows to send eos projections to Kafka using ESS histogram serialization.
|
||||
|
||||
For histogram_h01 the message is build using:
|
||||
|
||||
hist = {
|
||||
"source": "some_source",
|
||||
"timestamp": 123456,
|
||||
"current_shape": [2, 5],
|
||||
"dim_metadata": [
|
||||
{
|
||||
"length": 2,
|
||||
"unit": "a",
|
||||
"label": "x",
|
||||
"bin_boundaries": np.array([10, 11, 12]),
|
||||
},
|
||||
{
|
||||
"length": 5,
|
||||
"unit": "b",
|
||||
"label": "y",
|
||||
"bin_boundaries": np.array([0, 1, 2, 3, 4, 5]),
|
||||
},
|
||||
],
|
||||
"last_metadata_timestamp": 123456,
|
||||
"data": np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]),
|
||||
"errors": np.array([[5, 4, 3, 2, 1], [10, 9, 8, 7, 6]]),
|
||||
"info": "info_string",
|
||||
}
|
||||
"""
|
||||
import logging
|
||||
from typing import List, Tuple, Union
|
||||
from threading import Thread, Event
|
||||
|
||||
import numpy as np
|
||||
import json
|
||||
from time import time
|
||||
from dataclasses import dataclass, asdict
|
||||
from streaming_data_types import histogram_hs01
|
||||
from confluent_kafka import Producer, Consumer
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from .projection import TofZProjection, YZProjection
|
||||
|
||||
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
|
||||
KAFKA_TOPICS = {
|
||||
'histogram': 'AMOR_histograms',
|
||||
'response': 'AMOR_histResponse',
|
||||
'command': 'AMOR_histCommands'
|
||||
}
|
||||
|
||||
def ktime():
|
||||
return int(time()*1_000)
|
||||
|
||||
@dataclass
|
||||
class DimMetadata:
|
||||
length: int
|
||||
unit: str
|
||||
label: str
|
||||
bin_boundaries: np.ndarray
|
||||
|
||||
@dataclass
|
||||
class HistogramMessage:
|
||||
source: str
|
||||
timestamp: int
|
||||
current_shape: Tuple[int, int]
|
||||
dim_metadata: Tuple[DimMetadata, DimMetadata]
|
||||
last_metadata_timestamp: int
|
||||
data: np.ndarray
|
||||
errors: np.ndarray
|
||||
info: str
|
||||
|
||||
def serialize(self):
|
||||
return histogram_hs01.serialise_hs01(asdict(self))
|
||||
|
||||
@dataclass
|
||||
class CommandMessage:
|
||||
msg_id: str
|
||||
|
||||
cmd=None
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, data):
|
||||
"""
|
||||
Uses the sub-class cmd attribute to select which message to retugn
|
||||
"""
|
||||
msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()])
|
||||
return msg[data['cmd']](**data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Stop(CommandMessage):
|
||||
hist_id: str
|
||||
id: str
|
||||
cmd:str = 'stop'
|
||||
|
||||
@dataclass
|
||||
class HistogramConfig:
|
||||
id: str
|
||||
type: str
|
||||
data_brokers: List[str]
|
||||
topic: str
|
||||
data_topics: List[str]
|
||||
tof_range: Tuple[float, float]
|
||||
det_range: Tuple[int, int]
|
||||
num_bins: int
|
||||
width: int
|
||||
height: int
|
||||
left_edges: list
|
||||
source: str
|
||||
|
||||
@dataclass
|
||||
class ConfigureHistogram(CommandMessage):
|
||||
histograms: List[HistogramConfig]
|
||||
start: int
|
||||
cmd:str = 'config'
|
||||
|
||||
def __post_init__(self):
|
||||
self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms]
|
||||
|
||||
|
||||
class ESSSerializer:
|
||||
|
||||
def __init__(self):
|
||||
self.producer = Producer({
|
||||
'bootstrap.servers': KAFKA_BROKER,
|
||||
'message.max.bytes': 4_000_000,
|
||||
})
|
||||
self.consumer = Consumer({
|
||||
'bootstrap.servers': KAFKA_BROKER,
|
||||
"group.id": uuid4(),
|
||||
"default.topic.config": {"auto.offset.reset": "latest"},
|
||||
})
|
||||
self._active_histogram_yz = None
|
||||
self._active_histogram_tofz = None
|
||||
self.new_count_started = Event()
|
||||
self.count_stopped = Event()
|
||||
|
||||
self.consumer.subscribe([KAFKA_TOPICS['command']])
|
||||
|
||||
def process_message(self, message):
|
||||
if message.error():
|
||||
logging.error("Command Consumer Error: %s", message.error())
|
||||
else:
|
||||
command = json.loads(message.value().decode())
|
||||
try:
|
||||
command = CommandMessage.get_message(command)
|
||||
except Exception:
|
||||
logging.error(f'Could not interpret message: \n{command}', exc_info=True)
|
||||
return
|
||||
logging.info(command)
|
||||
resp = json.dumps({
|
||||
"msg_id": getattr(command, "id", None) or command.msg_id,
|
||||
"response": "ACK",
|
||||
"message": ""
|
||||
})
|
||||
self.producer.produce(
|
||||
topic=KAFKA_TOPICS['response'],
|
||||
value=resp
|
||||
)
|
||||
self.producer.flush()
|
||||
if isinstance(command, Stop):
|
||||
if command.hist_id == self._active_histogram_yz:
|
||||
self.count_stopped.set()
|
||||
else:
|
||||
return
|
||||
elif isinstance(command, ConfigureHistogram):
|
||||
for hist in command.histograms:
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_YZ':
|
||||
self._active_histogram_yz = hist.id
|
||||
logging.debug(f" histogram data_topic: {hist.data_topics}")
|
||||
self._start = command.start
|
||||
self.count_stopped.clear()
|
||||
self.new_count_started.set()
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ':
|
||||
self._active_histogram_tofz = hist.id
|
||||
|
||||
def receive(self, timeout=5):
|
||||
rec = self.consumer.poll(timeout)
|
||||
if rec is not None:
|
||||
self.process_message(rec)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def receive_loop(self):
|
||||
while not self._stop_receiving.is_set():
|
||||
try:
|
||||
self.receive()
|
||||
except Exception:
|
||||
logging.error("Exception while receiving", exc_info=True)
|
||||
|
||||
def start_command_thread(self):
|
||||
self._stop_receiving = Event()
|
||||
self._command_thread = Thread(target=self.receive_loop)
|
||||
self._command_thread.start()
|
||||
|
||||
def end_command_thread(self, event=None):
|
||||
self._stop_receiving.set()
|
||||
self._command_thread.join()
|
||||
|
||||
def acked(self, err, msg):
|
||||
# We need to have callback to produce-method to catch server errors
|
||||
if err is not None:
|
||||
logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err)))
|
||||
else:
|
||||
logging.debug("Message produced: %s" % (str(msg)))
|
||||
|
||||
def send(self, proj: Union[YZProjection, TofZProjection], final=False):
|
||||
if final:
|
||||
state = 'FINISHED'
|
||||
else:
|
||||
state = 'COUNTING'
|
||||
if isinstance(proj, YZProjection):
|
||||
if self._active_histogram_yz is None:
|
||||
return
|
||||
suffix = 'YZ'
|
||||
message = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1),
|
||||
dim_metadata=(
|
||||
DimMetadata(
|
||||
length=proj.y.shape[0]-1,
|
||||
unit="pixel",
|
||||
label="Y",
|
||||
bin_boundaries=proj.y,
|
||||
),
|
||||
DimMetadata(
|
||||
length=proj.z.shape[0]-1,
|
||||
unit="pixel",
|
||||
label="Z",
|
||||
bin_boundaries=proj.z,
|
||||
)
|
||||
),
|
||||
last_metadata_timestamp=0,
|
||||
data=proj.data.cts,
|
||||
errors=np.sqrt(proj.data.cts),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": state,
|
||||
"num events": proj.data.cts.sum()
|
||||
})
|
||||
)
|
||||
logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos")
|
||||
elif isinstance(proj, TofZProjection):
|
||||
if self._active_histogram_tofz is None:
|
||||
return
|
||||
suffix = 'TofZ'
|
||||
message = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1),
|
||||
dim_metadata=(
|
||||
DimMetadata(
|
||||
length=proj.tof.shape[0]-1,
|
||||
unit="ms",
|
||||
label="ToF",
|
||||
bin_boundaries=proj.tof,
|
||||
),
|
||||
DimMetadata(
|
||||
length=proj.z.shape[0]-1,
|
||||
unit="pixel",
|
||||
label="Z",
|
||||
bin_boundaries=proj.z,
|
||||
),
|
||||
),
|
||||
last_metadata_timestamp=0,
|
||||
data=proj.data.cts,
|
||||
errors=np.sqrt(proj.data.cts),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": state,
|
||||
"num events": proj.data.I.sum()
|
||||
})
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented")
|
||||
|
||||
self.producer.produce(value=message.serialize(),
|
||||
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
|
||||
callback=self.acked)
|
||||
self.producer.flush()
|
||||
46
eos/nicos.py
Normal file
46
eos/nicos.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""
|
||||
events2histogram vizualising data from Amor@SINQ, PSI
|
||||
|
||||
Author: Jochen Stahn (algorithms, python draft),
|
||||
Artur Glavic (structuring and optimisation of code)
|
||||
"""
|
||||
import logging
|
||||
|
||||
# need to do absolute import here as pyinstaller requires it
|
||||
from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig
|
||||
from eos.command_line import commandLineArgs
|
||||
from eos.logconfig import setup_logging, update_loglevel
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
logging.getLogger('matplotlib').setLevel(logging.WARNING)
|
||||
|
||||
# read command line arguments and generate classes holding configuration parameters
|
||||
clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig],
|
||||
'amor-nicos')
|
||||
update_loglevel(clas.verbose)
|
||||
if clas.verbose<2:
|
||||
# only log info level in logfile
|
||||
logger = logging.getLogger() # logging.getLogger('quicknxs')
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
reader_config = ReaderConfig.from_args(clas)
|
||||
experiment_config = ExperimentConfig.from_args(clas)
|
||||
reduction_config = E2HReductionConfig.from_args(clas)
|
||||
config = E2HConfig(reader_config, experiment_config, reduction_config)
|
||||
|
||||
logging.warning('######## amor-nicos - Nicos histogram for Amor ########')
|
||||
from eos.reduction_kafka import KafkaReduction
|
||||
|
||||
# only import heavy module if sufficient command line parameters were provided
|
||||
from eos.reduction_reflectivity import ReflectivityReduction
|
||||
# Create reducer with these arguments
|
||||
reducer = KafkaReduction(config)
|
||||
# Perform actual reduction
|
||||
reducer.reduce()
|
||||
|
||||
logging.info('######## amor-nicos - finished ########')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -91,10 +91,12 @@ class ArgParsable:
|
||||
if get_origin(typ) is list:
|
||||
args['nargs'] = '+'
|
||||
typ = get_args(typ)[0]
|
||||
if get_origin(typ) is tuple:
|
||||
# tuple of items are put together during evaluation
|
||||
typ = get_args(typ)[0]
|
||||
elif get_origin(typ) is tuple:
|
||||
args['nargs'] = len(get_args(typ))
|
||||
typ = get_args(typ)[0]
|
||||
|
||||
if issubclass(typ, StrEnum):
|
||||
args['choices'] = [ci.value for ci in typ]
|
||||
if field.default is not MISSING:
|
||||
@@ -149,7 +151,12 @@ class ArgParsable:
|
||||
if get_origin(field.type) is Union and type(None) in get_args(field.type):
|
||||
# optional argument
|
||||
typ = get_args(field.type)[0]
|
||||
|
||||
if get_origin(typ) is list:
|
||||
item_typ = get_args(typ)[0]
|
||||
if get_origin(item_typ) is tuple:
|
||||
# tuple of items are put together during evaluation
|
||||
tuple_length = len(get_args(item_typ))
|
||||
value = [tuple(value[i*tuple_length+j] for j in range(tuple_length)) for i in range(len(value)//tuple_length)]
|
||||
if isinstance(typ, type) and issubclass(typ, StrEnum):
|
||||
# convert str to enum
|
||||
try:
|
||||
@@ -359,6 +366,14 @@ class ReflectivityReductionConfig(ArgParsable):
|
||||
'help': 'theta region of interest w.r.t. beam center',
|
||||
},
|
||||
)
|
||||
thetaFilters: List[Tuple[float, float]] = field(
|
||||
default_factory=lambda: [],
|
||||
metadata={
|
||||
'short': 'TF',
|
||||
'group': 'region of interest',
|
||||
'help': 'add one or more theta ranges that will be filtered in reduction',
|
||||
},
|
||||
)
|
||||
normalisationMethod: NormalisationMethod = field(
|
||||
default=NormalisationMethod.over_illuminated,
|
||||
metadata={
|
||||
@@ -640,6 +655,14 @@ class E2HReductionConfig(ArgParsable):
|
||||
},
|
||||
)
|
||||
|
||||
kafka: bool = field(
|
||||
default=False,
|
||||
metadata={
|
||||
'group': 'output',
|
||||
'help': 'send result to kafka for Nicos',
|
||||
},
|
||||
)
|
||||
|
||||
plotArgs: E2HPlotArguments = field(
|
||||
default=E2HPlotArguments.Default,
|
||||
metadata={
|
||||
@@ -701,6 +724,15 @@ class E2HReductionConfig(ArgParsable):
|
||||
},
|
||||
)
|
||||
|
||||
thetaFilters: List[Tuple[float, float]] = field(
|
||||
default_factory=lambda: [],
|
||||
metadata={
|
||||
'short': 'TF',
|
||||
'group': 'region of interest',
|
||||
'help': 'add one or more theta ranges that will be filtered in reduction',
|
||||
},
|
||||
)
|
||||
|
||||
fontsize: float = field(
|
||||
default=8.,
|
||||
metadata={
|
||||
|
||||
@@ -544,12 +544,12 @@ class TofZProjection(ProjectionInterface):
|
||||
('err', np.float64),
|
||||
])
|
||||
|
||||
def __init__(self, tau, foldback=False):
|
||||
def __init__(self, tau, foldback=False, combine=1):
|
||||
self.z = np.arange(Detector.nBlades*Detector.nWires+1)-0.5
|
||||
if foldback:
|
||||
self.tof = np.arange(0, tau, 0.0005)
|
||||
self.tof = np.arange(0, tau, 0.0005*combine)
|
||||
else:
|
||||
self.tof = np.arange(0, 2*tau, 0.0005)
|
||||
self.tof = np.arange(0, 2*tau, 0.0005*combine)
|
||||
self.data = np.zeros((self.tof.shape[0]-1, self.z.shape[0]-1), dtype=self._dtype).view(np.recarray)
|
||||
self.monitor = 0.
|
||||
|
||||
@@ -760,7 +760,7 @@ class TProjection(ProjectionInterface):
|
||||
|
||||
plt.xlabel('Reflection Angle / °')
|
||||
plt.ylabel('I / cpm')
|
||||
plt.xlim(self.theta[0], self.theta[-1])
|
||||
plt.xlim(self.theta[-1], self.theta[0])
|
||||
plt.title('Theta')
|
||||
|
||||
def update_plot(self):
|
||||
|
||||
@@ -37,6 +37,8 @@ class E2HReduction:
|
||||
|
||||
self.header = Header()
|
||||
|
||||
self.fig = plt.figure()
|
||||
self.register_colormap()
|
||||
self.prepare_actions()
|
||||
|
||||
def prepare_actions(self):
|
||||
@@ -47,7 +49,6 @@ class E2HReduction:
|
||||
self.file_list = self.path_resolver.resolve(self.config.reduction.fileIdentifier)
|
||||
self.file_index = 0
|
||||
self.plot_kwds = {}
|
||||
self.fig = plt.figure()
|
||||
plt.rcParams.update({'font.size': self.config.reduction.fontsize})
|
||||
|
||||
if self.config.reduction.update:
|
||||
@@ -92,7 +93,8 @@ class E2HReduction:
|
||||
|
||||
# plot dependant options
|
||||
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]:
|
||||
self.grid = LZGrid(0.01, [0.0, 0.25])
|
||||
self.grid = LZGrid(0.05, [0.0, 0.25])
|
||||
self.grid.dldl = 0.05
|
||||
|
||||
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw,
|
||||
E2HPlotSelection.LT, E2HPlotSelection.YT,
|
||||
@@ -102,8 +104,6 @@ class E2HReduction:
|
||||
if self.config.reduction.plotArgs==E2HPlotArguments.Linear:
|
||||
self.plot_kwds['norm'] = None
|
||||
|
||||
self.register_colormap()
|
||||
|
||||
def reduce(self):
|
||||
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]:
|
||||
if self.config.reduction.normalizationModel:
|
||||
@@ -125,6 +125,12 @@ class E2HReduction:
|
||||
if self.config.reduction.plotArgs==E2HPlotArguments.Default and not self.config.reduction.update:
|
||||
# safe to image file if not auto-updating graph
|
||||
plt.savefig(f'e2h_{self.config.reduction.plot}.png', dpi=300)
|
||||
if self.config.reduction.kafka:
|
||||
from .kafka_serializer import ESSSerializer
|
||||
self.serializer = ESSSerializer()
|
||||
self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread)
|
||||
self.serializer.start_command_thread()
|
||||
self.serializer.send(self.projection)
|
||||
if self.config.reduction.update:
|
||||
self.timer = self.fig.canvas.new_timer(1000)
|
||||
self.timer.add_callback(self.update)
|
||||
@@ -132,8 +138,9 @@ class E2HReduction:
|
||||
if self.config.reduction.show_plot:
|
||||
plt.show()
|
||||
|
||||
|
||||
def register_colormap(self):
|
||||
cmap = plt.colormaps['gnuplot'](np.arange(256))
|
||||
cmap = plt.colormaps['turbo'](np.arange(256))
|
||||
cmap[:1, :] = np.array([256/256, 255/256, 236/256, 1])
|
||||
cmap = ListedColormap(cmap, name='jochen_deluxe', N=cmap.shape[0])
|
||||
#cmap.set_bad((1.,1.,0.9))
|
||||
@@ -155,6 +162,8 @@ class E2HReduction:
|
||||
self.projection.correct_gravity(last_file_header.geometry.detectorDistance)
|
||||
self.projection.apply_lamda_mask(self.config.experiment.lambdaRange)
|
||||
self.projection.apply_theta_mask(thetaRange)
|
||||
for thi in self.config.reduction.thetaFilters:
|
||||
self.projection.apply_theta_filter((thi[0]+tthh, thi[1]+tthh))
|
||||
self.projection.apply_norm_mask(self.norm)
|
||||
|
||||
if self.config.reduction.plot==E2HPlotSelection.Q:
|
||||
@@ -164,6 +173,8 @@ class E2HReduction:
|
||||
plz.calculate_q()
|
||||
plz.apply_lamda_mask(self.config.experiment.lambdaRange)
|
||||
plz.apply_theta_mask(thetaRange)
|
||||
for thi in self.config.reduction.thetaFilters:
|
||||
self.projection.apply_theta_filter((thi[0]+tthh, thi[1]+tthh))
|
||||
plz.apply_norm_mask(self.norm)
|
||||
self.projection = ReflectivityProjector(plz, self.norm)
|
||||
|
||||
@@ -192,6 +203,8 @@ class E2HReduction:
|
||||
plz.calculate_q()
|
||||
plz.apply_lamda_mask(self.config.experiment.lambdaRange)
|
||||
plz.apply_theta_mask(thetaRange)
|
||||
for thi in self.config.reduction.thetaFilters:
|
||||
plz.apply_theta_filter((thi[0]+tthh, thi[1]+tthh))
|
||||
plz.apply_norm_mask(self.norm)
|
||||
pr = ReflectivityProjector(plz, self.norm)
|
||||
pyz = YZProjection()
|
||||
@@ -227,7 +240,7 @@ class E2HReduction:
|
||||
self.projection.normalize_over_illuminated(self.norm)
|
||||
|
||||
def create_file_output(self):
|
||||
...
|
||||
raise NotImplementedError("Export to text output not yet implemented")
|
||||
|
||||
def create_title(self):
|
||||
output = "Events to Histogram - "
|
||||
@@ -247,14 +260,23 @@ class E2HReduction:
|
||||
new_files = self.path_resolver.resolve(f'{latest}')
|
||||
if not os.path.exists(new_files[-1]):
|
||||
return
|
||||
try:
|
||||
# check that events exist in the new file
|
||||
AmorEventData(new_files[-1], 0, max_events=1000)
|
||||
except Exception:
|
||||
logging.debug("Problem when trying to load new dataset", exc_info=True)
|
||||
return
|
||||
|
||||
logging.warning(f"Preceding to next file {latest}")
|
||||
self.file_list = new_files
|
||||
self.file_index = 0
|
||||
self.prepare_actions()
|
||||
self.prepare_graphs()
|
||||
self.read_data()
|
||||
self.projection.clear()
|
||||
self.add_data()
|
||||
self.fig.clear()
|
||||
self.create_graph()
|
||||
plt.draw()
|
||||
|
||||
def update(self):
|
||||
logging.debug(" check for update")
|
||||
@@ -284,4 +306,7 @@ class E2HReduction:
|
||||
|
||||
self.projection.update_plot()
|
||||
plt.suptitle(self.create_title())
|
||||
plt.draw()
|
||||
plt.draw()
|
||||
|
||||
if self.config.reduction.kafka:
|
||||
self.serializer.send(self.projection)
|
||||
|
||||
128
eos/reduction_kafka.py
Normal file
128
eos/reduction_kafka.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""
|
||||
Events 2 histogram, quick reduction of single file to display during experiment.
|
||||
Can be used as a live preview with automatic update when files are modified.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from time import sleep
|
||||
from .kafka_events import KafkaEventData
|
||||
from .header import Header
|
||||
from .options import E2HConfig
|
||||
from . import event_handling as eh, event_analysis as ea
|
||||
from .projection import TofZProjection, YZProjection
|
||||
from .kafka_serializer import ESSSerializer
|
||||
|
||||
|
||||
class KafkaReduction:
|
||||
config: E2HConfig
|
||||
header: Header
|
||||
event_actions: eh.EventDataAction
|
||||
|
||||
_last_mtime = 0.
|
||||
proj_yz: YZProjection
|
||||
proj_tofz = TofZProjection
|
||||
|
||||
def __init__(self, config: E2HConfig):
|
||||
self.config = config
|
||||
|
||||
self.header = Header()
|
||||
self.event_data = KafkaEventData()
|
||||
self.event_data.start()
|
||||
|
||||
self.prepare_actions()
|
||||
|
||||
def prepare_actions(self):
|
||||
"""
|
||||
Does not do any actual reduction.
|
||||
"""
|
||||
# Actions on datasets not used for normalization
|
||||
self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset)
|
||||
self.event_actions |= eh.CorrectChopperPhase()
|
||||
self.event_actions |= ea.MergeFrames()
|
||||
self.event_actions |= eh.ApplyMask()
|
||||
|
||||
def reduce(self):
|
||||
self.create_projections()
|
||||
self.read_data()
|
||||
self.add_data()
|
||||
|
||||
self.serializer = ESSSerializer()
|
||||
self.serializer.start_command_thread()
|
||||
|
||||
self.loop()
|
||||
|
||||
def create_projections(self):
|
||||
self.proj_yz = YZProjection()
|
||||
self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2)
|
||||
|
||||
def read_data(self):
|
||||
# make sure the first events have arrived before starting analysis
|
||||
self.event_data.new_events.wait()
|
||||
self.dataset = self.event_data.get_events()
|
||||
self.event_actions(self.dataset)
|
||||
|
||||
|
||||
def add_data(self):
|
||||
self.monitor = self.dataset.monitor
|
||||
self.proj_yz.project(self.dataset, monitor=self.monitor)
|
||||
self.proj_tofz.project(self.dataset, monitor=self.monitor)
|
||||
|
||||
def loop(self):
|
||||
self.wait_for = self.serializer.new_count_started
|
||||
while True:
|
||||
try:
|
||||
self.update()
|
||||
self.wait_for.wait(1.0)
|
||||
except KeyboardInterrupt:
|
||||
self.event_data.stop_event.set()
|
||||
self.event_data.join()
|
||||
self.serializer.end_command_thread()
|
||||
return
|
||||
|
||||
def update(self):
|
||||
if self.serializer.new_count_started.is_set():
|
||||
logging.warning('Start new count, clearing event data')
|
||||
self.wait_for = self.serializer.count_stopped
|
||||
self.event_data.restart()
|
||||
self.serializer.new_count_started.clear()
|
||||
self.create_projections()
|
||||
return
|
||||
elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set():
|
||||
return self.finish_count()
|
||||
try:
|
||||
update_data = self.event_data.get_events()
|
||||
except EOFError:
|
||||
return
|
||||
logging.info(" updating with new data")
|
||||
|
||||
self.event_actions(update_data)
|
||||
self.dataset=update_data
|
||||
self.monitor = self.dataset.monitor
|
||||
self.proj_yz.project(update_data, self.monitor)
|
||||
self.proj_tofz.project(update_data, self.monitor)
|
||||
|
||||
self.serializer.send(self.proj_yz)
|
||||
self.serializer.send(self.proj_tofz)
|
||||
|
||||
def finish_count(self):
|
||||
logging.debug(" stop event set, hold event collection and send final results")
|
||||
self.wait_for = self.serializer.new_count_started
|
||||
self.event_data.stop_counting.set()
|
||||
|
||||
try:
|
||||
update_data = self.event_data.get_events()
|
||||
except EOFError:
|
||||
pass
|
||||
else:
|
||||
self.event_actions(update_data)
|
||||
self.dataset = update_data
|
||||
self.monitor = self.dataset.monitor
|
||||
self.proj_yz.project(update_data, self.monitor)
|
||||
self.proj_tofz.project(update_data, self.monitor)
|
||||
|
||||
logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}')
|
||||
|
||||
self.serializer.send(self.proj_yz, final=True)
|
||||
self.serializer.send(self.proj_tofz, final=True)
|
||||
@@ -373,8 +373,8 @@ class ReflectivityReduction:
|
||||
proj = LZProjection.from_dataset(dataset, self.grid,
|
||||
has_offspecular=(self.config.experiment.incidentAngle!=IncidentAngle.alphaF))
|
||||
|
||||
t0 = dataset.geometry.nu-dataset.geometry.mu
|
||||
if not self.config.reduction.is_default('thetaRangeR'):
|
||||
t0 = dataset.geometry.nu - dataset.geometry.mu
|
||||
# adjust range based on detector center
|
||||
thetaRange = [ti+t0 for ti in self.config.reduction.thetaRangeR]
|
||||
proj.apply_theta_mask(thetaRange)
|
||||
@@ -384,6 +384,9 @@ class ReflectivityReduction:
|
||||
thetaRange = [dataset.geometry.nu - dataset.geometry.mu - dataset.geometry.div/2,
|
||||
dataset.geometry.nu - dataset.geometry.mu + dataset.geometry.div/2]
|
||||
proj.apply_theta_mask(thetaRange)
|
||||
for thi in self.config.reduction.thetaFilters:
|
||||
# apply theta filters relative to angle on detector (issues with parts of the incoming divergence)
|
||||
proj.apply_theta_filter((thi[0]+t0, thi[1]+t0))
|
||||
|
||||
proj.apply_lamda_mask(self.config.experiment.lambdaRange)
|
||||
|
||||
|
||||
42
nicos_config.md
Normal file
42
nicos_config.md
Normal file
@@ -0,0 +1,42 @@
|
||||
EOS-Service
|
||||
===========
|
||||
|
||||
EOS can be used as histogram service to send images to the Nicos instrument control software.
|
||||
For that you need to run it on the amor instrument computer:
|
||||
|
||||
```bash
|
||||
amor-nicos {-vv}
|
||||
```
|
||||
|
||||
The instrument config in Nicos needs to configure a Kafka JustBinItImage instance
|
||||
for each histogram that should be used:
|
||||
|
||||
```python
|
||||
hist_yz = device('nicos_sinq.devices.just_bin_it.JustBinItImage',
|
||||
description = 'Detector pixel histogram over all times',
|
||||
hist_topic = 'AMOR_histograms_YZ',
|
||||
data_topic = 'AMOR_detector',
|
||||
command_topic = 'AMOR_histCommands',
|
||||
brokers = ['linkafka01.psi.ch:9092'],
|
||||
unit = 'evts',
|
||||
hist_type = '2-D SANSLLB',
|
||||
det_width = 446,
|
||||
det_height = 64,
|
||||
),
|
||||
hist_tofz = device('nicos_sinq.devices.just_bin_it.JustBinItImage',
|
||||
description = 'Detector time of flight vs. z-pixel histogram over all y-values',
|
||||
hist_topic = 'AMOR_histograms_TofZ',
|
||||
data_topic = 'AMOR_detector',
|
||||
command_topic = 'AMOR_histCommands',
|
||||
brokers = ['linkafka01.psi.ch:9092'],
|
||||
unit = 'evts',
|
||||
hist_type = '2-D SANSLLB',
|
||||
det_width = 118,
|
||||
det_height = 446,
|
||||
),
|
||||
```
|
||||
|
||||
These images have then to be set in the detector configuration as _images_ items:
|
||||
```
|
||||
images=['hist_yz', 'hist_tofz'],
|
||||
```
|
||||
@@ -35,3 +35,4 @@ Homepage = "https://github.com/jochenstahn/amor"
|
||||
console_scripts =
|
||||
eos = eos.__main__:main
|
||||
events2histogram = eos.e2h:main
|
||||
amor-nicos = eos.nicos:main
|
||||
|
||||
16
update.md
Normal file
16
update.md
Normal file
@@ -0,0 +1,16 @@
|
||||
Make new release
|
||||
================
|
||||
|
||||
- Update revision in `eos/__init__.py`
|
||||
- Commit changes `git commit -a -m "your message here"`
|
||||
- Tag version `git tag v3.x.y`
|
||||
- Push changes `git push` and `git push --tags`
|
||||
- This should trigger the **Release** action on GitHub that builds a new version and uploads it to PyPI.
|
||||
|
||||
|
||||
Update on AMOR
|
||||
==============
|
||||
|
||||
- Login via SSH using the **amor** user.
|
||||
- Activate eos virtual environment `source /home/software/virtualenv/eosenv/bin/activate`
|
||||
- Update eos packge `pip install --upgrade amor-eos`
|
||||
Reference in New Issue
Block a user