diff --git a/eos/kafka_events.py b/eos/kafka_events.py new file mode 100644 index 0000000..7f638e9 --- /dev/null +++ b/eos/kafka_events.py @@ -0,0 +1,165 @@ +""" +Collect AMOR detector events send via Kafka. +""" + +import logging +import numpy as np +from threading import Thread, Event +from time import time + +from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE + +from uuid import uuid4 +from streaming_data_types.eventdata_ev44 import EventData +from streaming_data_types.logdata_f144 import ExtractedLogData +from streaming_data_types import deserialise_f144, deserialise_ev44 +from confluent_kafka import Consumer + +from .header import Header + + +try: + from streaming_data_types.utils import get_schema +except ImportError: + from streaming_data_types.utils import _get_schema as get_schema + + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +AMOR_EVENTS = 'amor_ev44' +AMOR_NICOS = 'AMOR_nicosForwarder' + +class KafkaFrozenData: + """ + Represents event stream data from Kafka at a given time. + Will be returned by KafkaEventData to be use in conjunction + with data processing and projections. + + Implements EventDatasetProtocol + """ + geometry: AmorGeometry + timing: AmorTiming + data: AmorEventStream + + def __init__(self, geometry, timing, data, monitor=1.): + self.geometry = geometry + self.timing = timing + self.data = data + self.monitor = monitor + + def append(self, other): + raise NotImplementedError("can't append live datastream to other event data") + + def update_header(self, header:Header): + # maybe makes sense later, but for now just used for live vizualization + ... + +class KafkaEventData(Thread): + """ + Read Nicos information and events from Kafka. Creates a background + thread that listens to Kafka events and converts them to eos compatible information. + """ + geometry: AmorGeometry + timing: AmorTiming + events: np.recarray + + def __init__(self): + self.stop_event = Event() + self.stop_counting = Event() + self.new_events = Event() + self.last_read = 0 + self.last_read_time = 0. + self.start_time = time() + self.consumer = Consumer( + {'bootstrap.servers': 'linkafka01.psi.ch:9092', + 'group.id': uuid4()}) + self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS]) + self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0) + self.timing = AmorTiming(0., 0., 500., 0., 30./500.) + # create empty dataset + self.events = np.recarray(0, dtype=EVENT_TYPE) + super().__init__() + + def run(self): + while not self.stop_event.is_set(): + messages = self.consumer.consume(10, timeout=1) + for message in messages: + self.process_message(message) + + def process_message(self, message): + if message.error(): + logging.info(f" received Kafka message with error: {message.error()}") + return + schema = get_schema(message.value()) + if message.topic()==AMOR_EVENTS and schema=='ev44': + events:EventData = deserialise_ev44(message.value()) + self.add_events(events) + self.new_events.set() + logging.debug(f' new events {events}') + elif message.topic()==AMOR_NICOS and schema=='f144': + nicos_data:ExtractedLogData = deserialise_f144(message.value()) + if nicos_data.source_name in self.nicos_mapping.keys(): + logging.debug(f' {nicos_data.source_name} = {nicos_data.value}') + self.update_instrument(nicos_data) + + def add_events(self, events:EventData): + """ + Add new events to the Dataset. The object keeps raw events + and only copies the latest set to the self.data object, + this allows to run the event processing to be performed on a "clean" + evnet stream each time. + """ + if self.stop_counting.is_set(): + return + prev_size = self.events.shape[0] + new_events = events.pixel_id.shape[0] + self.events.resize(prev_size+new_events, refcheck=False) + self.events.pixelID[prev_size:] = events.pixel_id + self.events.mask[prev_size:] = 0 + self.events.tof[prev_size:] = events.time_of_flight/1.e9 + + nicos_mapping = { + 'mu': ('geometry', 'mu'), + 'nu': ('geometry', 'nu'), + 'kappa': ('geometry', 'kap'), + 'kappa_offset': ('geometry', 'kad'), + 'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'), + 'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'), + 'ch2_speed': ('timing', 'chopperSpeed'), + 'chopper_phase': ('timing', 'chopperPhase'), + } + + def update_instrument(self, nicos_data:ExtractedLogData): + if nicos_data.source_name in self.nicos_mapping: + attr, subattr = self.nicos_mapping[nicos_data.source_name] + setattr(getattr(self, attr), subattr, nicos_data.value) + if nicos_data.source_name=='ch2_speed': + self.timing.tau = 30./self.timing.chopperSpeed + + def monitor(self): + return time()-self.start_time + + def restart(self): + # empty event buffer + self.events = np.recarray(0, dtype=EVENT_TYPE) + self.stop_counting.clear() + self.last_read = 0 + self.start_time = time() + self.new_events.clear() + + def get_events(self, total_counts=False): + packets = np.recarray(0, dtype=PACKET_TYPE) + pulses = np.recarray(0, dtype=PULSE_TYPE) + pc = np.recarray(0, dtype=PC_TYPE) + if total_counts: + last_read = 0 + else: + last_read = self.last_read + if last_read>=self.events.shape[0]: + raise EOFError("No new events arrived") + data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc) + self.last_read = self.events.shape[0] + self.new_events.clear() + t_now = time() + monitor = t_now-self.last_read_time + self.last_read_time = t_now + return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor) diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py new file mode 100644 index 0000000..a1bf32b --- /dev/null +++ b/eos/kafka_serializer.py @@ -0,0 +1,283 @@ +""" +Allows to send eos projections to Kafka using ESS histogram serialization. + +For histogram_h01 the message is build using: + +hist = { + "source": "some_source", + "timestamp": 123456, + "current_shape": [2, 5], + "dim_metadata": [ + { + "length": 2, + "unit": "a", + "label": "x", + "bin_boundaries": np.array([10, 11, 12]), + }, + { + "length": 5, + "unit": "b", + "label": "y", + "bin_boundaries": np.array([0, 1, 2, 3, 4, 5]), + }, + ], + "last_metadata_timestamp": 123456, + "data": np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]), + "errors": np.array([[5, 4, 3, 2, 1], [10, 9, 8, 7, 6]]), + "info": "info_string", +} +""" +import logging +from typing import List, Tuple, Union +from threading import Thread, Event + +import numpy as np +import json +from time import time +from dataclasses import dataclass, asdict +from streaming_data_types import histogram_hs01 +from confluent_kafka import Producer, Consumer + +from uuid import uuid4 + +from .projection import TofZProjection, YZProjection + +KAFKA_BROKER = 'linkafka01.psi.ch:9092' +KAFKA_TOPICS = { + 'histogram': 'AMOR_histograms', + 'response': 'AMOR_histResponse', + 'command': 'AMOR_histCommands' + } + +def ktime(): + return int(time()*1_000) + +@dataclass +class DimMetadata: + length: int + unit: str + label: str + bin_boundaries: np.ndarray + +@dataclass +class HistogramMessage: + source: str + timestamp: int + current_shape: Tuple[int, int] + dim_metadata: Tuple[DimMetadata, DimMetadata] + last_metadata_timestamp: int + data: np.ndarray + errors: np.ndarray + info: str + + def serialize(self): + return histogram_hs01.serialise_hs01(asdict(self)) + +@dataclass +class CommandMessage: + msg_id: str + + cmd=None + + @classmethod + def get_message(cls, data): + """ + Uses the sub-class cmd attribute to select which message to retugn + """ + msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()]) + return msg[data['cmd']](**data) + + +@dataclass +class Stop(CommandMessage): + hist_id: str + id: str + cmd:str = 'stop' + +@dataclass +class HistogramConfig: + id: str + type: str + data_brokers: List[str] + topic: str + data_topics: List[str] + tof_range: Tuple[float, float] + det_range: Tuple[int, int] + num_bins: int + width: int + height: int + left_edges: list + source: str + +@dataclass +class ConfigureHistogram(CommandMessage): + histograms: List[HistogramConfig] + start: int + cmd:str = 'config' + + def __post_init__(self): + self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms] + + +class ESSSerializer: + + def __init__(self): + self.producer = Producer({ + 'bootstrap.servers': KAFKA_BROKER, + 'message.max.bytes': 4_000_000, + }) + self.consumer = Consumer({ + 'bootstrap.servers': KAFKA_BROKER, + "group.id": uuid4(), + "default.topic.config": {"auto.offset.reset": "latest"}, + }) + self._active_histogram_yz = None + self._active_histogram_tofz = None + self.new_count_started = Event() + self.count_stopped = Event() + + self.consumer.subscribe([KAFKA_TOPICS['command']]) + + def process_message(self, message): + if message.error(): + logging.error("Command Consumer Error: %s", message.error()) + else: + command = json.loads(message.value().decode()) + try: + command = CommandMessage.get_message(command) + except Exception: + logging.error(f'Could not interpret message: \n{command}', exc_info=True) + return + logging.info(command) + resp = json.dumps({ + "msg_id": getattr(command, "id", None) or command.msg_id, + "response": "ACK", + "message": "" + }) + self.producer.produce( + topic=KAFKA_TOPICS['response'], + value=resp + ) + self.producer.flush() + if isinstance(command, Stop): + if command.hist_id == self._active_histogram_yz: + self.count_stopped.set() + else: + return + elif isinstance(command, ConfigureHistogram): + for hist in command.histograms: + if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': + self._active_histogram_yz = hist.id + logging.debug(f" histogram data_topic: {hist.data_topics}") + self._start = command.start + self.count_stopped.clear() + self.new_count_started.set() + if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': + self._active_histogram_tofz = hist.id + + def receive(self, timeout=5): + rec = self.consumer.poll(timeout) + if rec is not None: + self.process_message(rec) + return True + else: + return False + + def receive_loop(self): + while not self._stop_receiving.is_set(): + try: + self.receive() + except Exception: + logging.error("Exception while receiving", exc_info=True) + + def start_command_thread(self): + self._stop_receiving = Event() + self._command_thread = Thread(target=self.receive_loop) + self._command_thread.start() + + def end_command_thread(self, event=None): + self._stop_receiving.set() + self._command_thread.join() + + def acked(self, err, msg): + # We need to have callback to produce-method to catch server errors + if err is not None: + logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err))) + else: + logging.debug("Message produced: %s" % (str(msg))) + + def send(self, proj: Union[YZProjection, TofZProjection], final=False): + if final: + state = 'FINISHED' + else: + state = 'COUNTING' + if isinstance(proj, YZProjection): + if self._active_histogram_yz is None: + return + suffix = 'YZ' + message = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1), + dim_metadata=( + DimMetadata( + length=proj.y.shape[0]-1, + unit="pixel", + label="Y", + bin_boundaries=proj.y, + ), + DimMetadata( + length=proj.z.shape[0]-1, + unit="pixel", + label="Z", + bin_boundaries=proj.z, + ) + ), + last_metadata_timestamp=0, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), + info=json.dumps({ + "start": self._start, + "state": state, + "num events": proj.data.cts.sum() + }) + ) + logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos") + elif isinstance(proj, TofZProjection): + if self._active_histogram_tofz is None: + return + suffix = 'TofZ' + message = HistogramMessage( + source='amor-eos', + timestamp=ktime(), + current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1), + dim_metadata=( + DimMetadata( + length=proj.tof.shape[0]-1, + unit="ms", + label="ToF", + bin_boundaries=proj.tof, + ), + DimMetadata( + length=proj.z.shape[0]-1, + unit="pixel", + label="Z", + bin_boundaries=proj.z, + ), + ), + last_metadata_timestamp=0, + data=proj.data.cts, + errors=np.sqrt(proj.data.cts), + info=json.dumps({ + "start": self._start, + "state": state, + "num events": proj.data.I.sum() + }) + ) + else: + raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") + + self.producer.produce(value=message.serialize(), + topic=KAFKA_TOPICS['histogram']+'_'+suffix, + callback=self.acked) + self.producer.flush() diff --git a/eos/nicos.py b/eos/nicos.py new file mode 100644 index 0000000..6d6f892 --- /dev/null +++ b/eos/nicos.py @@ -0,0 +1,46 @@ +""" +events2histogram vizualising data from Amor@SINQ, PSI + +Author: Jochen Stahn (algorithms, python draft), + Artur Glavic (structuring and optimisation of code) +""" +import logging + +# need to do absolute import here as pyinstaller requires it +from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig +from eos.command_line import commandLineArgs +from eos.logconfig import setup_logging, update_loglevel + + +def main(): + setup_logging() + logging.getLogger('matplotlib').setLevel(logging.WARNING) + + # read command line arguments and generate classes holding configuration parameters + clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig], + 'amor-nicos') + update_loglevel(clas.verbose) + if clas.verbose<2: + # only log info level in logfile + logger = logging.getLogger() # logging.getLogger('quicknxs') + logger.setLevel(logging.INFO) + + reader_config = ReaderConfig.from_args(clas) + experiment_config = ExperimentConfig.from_args(clas) + reduction_config = E2HReductionConfig.from_args(clas) + config = E2HConfig(reader_config, experiment_config, reduction_config) + + logging.warning('######## amor-nicos - Nicos histogram for Amor ########') + from eos.reduction_kafka import KafkaReduction + + # only import heavy module if sufficient command line parameters were provided + from eos.reduction_reflectivity import ReflectivityReduction + # Create reducer with these arguments + reducer = KafkaReduction(config) + # Perform actual reduction + reducer.reduce() + + logging.info('######## amor-nicos - finished ########') + +if __name__ == '__main__': + main() diff --git a/eos/options.py b/eos/options.py index 43c6d05..b133066 100644 --- a/eos/options.py +++ b/eos/options.py @@ -655,6 +655,14 @@ class E2HReductionConfig(ArgParsable): }, ) + kafka: bool = field( + default=False, + metadata={ + 'group': 'output', + 'help': 'send result to kafka for Nicos', + }, + ) + plotArgs: E2HPlotArguments = field( default=E2HPlotArguments.Default, metadata={ diff --git a/eos/projection.py b/eos/projection.py index 36b8e1e..d78ee5f 100644 --- a/eos/projection.py +++ b/eos/projection.py @@ -544,12 +544,12 @@ class TofZProjection(ProjectionInterface): ('err', np.float64), ]) - def __init__(self, tau, foldback=False): + def __init__(self, tau, foldback=False, combine=1): self.z = np.arange(Detector.nBlades*Detector.nWires+1)-0.5 if foldback: - self.tof = np.arange(0, tau, 0.0005) + self.tof = np.arange(0, tau, 0.0005*combine) else: - self.tof = np.arange(0, 2*tau, 0.0005) + self.tof = np.arange(0, 2*tau, 0.0005*combine) self.data = np.zeros((self.tof.shape[0]-1, self.z.shape[0]-1), dtype=self._dtype).view(np.recarray) self.monitor = 0. diff --git a/eos/reduction_e2h.py b/eos/reduction_e2h.py index 2561bf9..63a329f 100644 --- a/eos/reduction_e2h.py +++ b/eos/reduction_e2h.py @@ -93,7 +93,8 @@ class E2HReduction: # plot dependant options if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]: - self.grid = LZGrid(0.01, [0.0, 0.25]) + self.grid = LZGrid(0.05, [0.0, 0.25]) + self.grid.dldl = 0.05 if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw, E2HPlotSelection.LT, E2HPlotSelection.YT, @@ -124,6 +125,12 @@ class E2HReduction: if self.config.reduction.plotArgs==E2HPlotArguments.Default and not self.config.reduction.update: # safe to image file if not auto-updating graph plt.savefig(f'e2h_{self.config.reduction.plot}.png', dpi=300) + if self.config.reduction.kafka: + from .kafka_serializer import ESSSerializer + self.serializer = ESSSerializer() + self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread) + self.serializer.start_command_thread() + self.serializer.send(self.projection) if self.config.reduction.update: self.timer = self.fig.canvas.new_timer(1000) self.timer.add_callback(self.update) @@ -131,6 +138,7 @@ class E2HReduction: if self.config.reduction.show_plot: plt.show() + def register_colormap(self): cmap = plt.colormaps['turbo'](np.arange(256)) cmap[:1, :] = np.array([256/256, 255/256, 236/256, 1]) @@ -298,4 +306,7 @@ class E2HReduction: self.projection.update_plot() plt.suptitle(self.create_title()) - plt.draw() \ No newline at end of file + plt.draw() + + if self.config.reduction.kafka: + self.serializer.send(self.projection) diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py new file mode 100644 index 0000000..90ca054 --- /dev/null +++ b/eos/reduction_kafka.py @@ -0,0 +1,128 @@ +""" +Events 2 histogram, quick reduction of single file to display during experiment. +Can be used as a live preview with automatic update when files are modified. +""" + +import logging +import os + +from time import sleep +from .kafka_events import KafkaEventData +from .header import Header +from .options import E2HConfig +from . import event_handling as eh, event_analysis as ea +from .projection import TofZProjection, YZProjection +from .kafka_serializer import ESSSerializer + + +class KafkaReduction: + config: E2HConfig + header: Header + event_actions: eh.EventDataAction + + _last_mtime = 0. + proj_yz: YZProjection + proj_tofz = TofZProjection + + def __init__(self, config: E2HConfig): + self.config = config + + self.header = Header() + self.event_data = KafkaEventData() + self.event_data.start() + + self.prepare_actions() + + def prepare_actions(self): + """ + Does not do any actual reduction. + """ + # Actions on datasets not used for normalization + self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset) + self.event_actions |= eh.CorrectChopperPhase() + self.event_actions |= ea.MergeFrames() + self.event_actions |= eh.ApplyMask() + + def reduce(self): + self.create_projections() + self.read_data() + self.add_data() + + self.serializer = ESSSerializer() + self.serializer.start_command_thread() + + self.loop() + + def create_projections(self): + self.proj_yz = YZProjection() + self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2) + + def read_data(self): + # make sure the first events have arrived before starting analysis + self.event_data.new_events.wait() + self.dataset = self.event_data.get_events() + self.event_actions(self.dataset) + + + def add_data(self): + self.monitor = self.dataset.monitor + self.proj_yz.project(self.dataset, monitor=self.monitor) + self.proj_tofz.project(self.dataset, monitor=self.monitor) + + def loop(self): + self.wait_for = self.serializer.new_count_started + while True: + try: + self.update() + self.wait_for.wait(1.0) + except KeyboardInterrupt: + self.event_data.stop_event.set() + self.event_data.join() + self.serializer.end_command_thread() + return + + def update(self): + if self.serializer.new_count_started.is_set(): + logging.warning('Start new count, clearing event data') + self.wait_for = self.serializer.count_stopped + self.event_data.restart() + self.serializer.new_count_started.clear() + self.create_projections() + return + elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set(): + return self.finish_count() + try: + update_data = self.event_data.get_events() + except EOFError: + return + logging.info(" updating with new data") + + self.event_actions(update_data) + self.dataset=update_data + self.monitor = self.dataset.monitor + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + self.serializer.send(self.proj_yz) + self.serializer.send(self.proj_tofz) + + def finish_count(self): + logging.debug(" stop event set, hold event collection and send final results") + self.wait_for = self.serializer.new_count_started + self.event_data.stop_counting.set() + + try: + update_data = self.event_data.get_events() + except EOFError: + pass + else: + self.event_actions(update_data) + self.dataset = update_data + self.monitor = self.dataset.monitor + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}') + + self.serializer.send(self.proj_yz, final=True) + self.serializer.send(self.proj_tofz, final=True) diff --git a/nicos_config.md b/nicos_config.md new file mode 100644 index 0000000..72e85de --- /dev/null +++ b/nicos_config.md @@ -0,0 +1,42 @@ +EOS-Service +=========== + +EOS can be used as histogram service to send images to the Nicos instrument control software. +For that you need to run it on the amor instrument computer: + +```bash +amor-nicos {-vv} +``` + +The instrument config in Nicos needs to configure a Kafka JustBinItImage instance +for each histogram that should be used: + +```python +hist_yz = device('nicos_sinq.devices.just_bin_it.JustBinItImage', + description = 'Detector pixel histogram over all times', + hist_topic = 'AMOR_histograms_YZ', + data_topic = 'AMOR_detector', + command_topic = 'AMOR_histCommands', + brokers = ['linkafka01.psi.ch:9092'], + unit = 'evts', + hist_type = '2-D SANSLLB', + det_width = 446, + det_height = 64, + ), +hist_tofz = device('nicos_sinq.devices.just_bin_it.JustBinItImage', + description = 'Detector time of flight vs. z-pixel histogram over all y-values', + hist_topic = 'AMOR_histograms_TofZ', + data_topic = 'AMOR_detector', + command_topic = 'AMOR_histCommands', + brokers = ['linkafka01.psi.ch:9092'], + unit = 'evts', + hist_type = '2-D SANSLLB', + det_width = 118, + det_height = 446, + ), +``` + +These images have then to be set in the detector configuration as _images_ items: +``` +images=['hist_yz', 'hist_tofz'], +``` \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 7a88dc9..2c9fa4a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,3 +35,4 @@ Homepage = "https://github.com/jochenstahn/amor" console_scripts = eos = eos.__main__:main events2histogram = eos.e2h:main + amor-nicos = eos.nicos:main