diff --git a/eos/kafka_serializer.py b/eos/kafka_serializer.py index 42ef8b0..140e6fb 100644 --- a/eos/kafka_serializer.py +++ b/eos/kafka_serializer.py @@ -130,8 +130,10 @@ class ESSSerializer: "group.id": uuid4(), "default.topic.config": {"auto.offset.reset": "latest"}, }) - self._active_histogram = None - self._last_message = None + self._active_histogram_yz = None + self._active_histogram_tofz = None + self._last_message_yz = None + self._last_message_tofz = None self.consumer.subscribe([KAFKA_TOPICS['command']]) @@ -156,22 +158,41 @@ class ESSSerializer: value=resp ) self.producer.flush() - if isinstance(command, Stop) and command.hist_id == self._active_histogram: - self._active_histogram = None - message = self._last_message - message.timestamp = ktime() - message.info=json.dumps({ - "start": self._start, - "state": 'FINISHED', - "num events": message.data.sum() - }) - self._last_message = None + if isinstance(command, Stop): + if command.hist_id == self._active_histogram_yz: + suffix = 'YZ' + self._active_histogram_yz = None + message = self._last_message_yz + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message_yz = None + elif command.hist_id == self._active_histogram_tofz: + suffix = 'TofZ' + self._active_histogram_tofz = None + message = self._last_message_tofz + message.timestamp = ktime() + message.info=json.dumps({ + "start": self._start, + "state": 'FINISHED', + "num events": message.data.sum() + }) + self._last_message_tofz = None + else: + return self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram'], + topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() elif isinstance(command, ConfigureHistogram): - self._active_histogram = command.histograms[0].id + for hist in command.histograms: + if hist.topic == KAFKA_TOPICS['histogram']+'_YZ': + self._active_histogram_yz = hist.id + if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ': + self._active_histogram_tofz = hist.id self._start = command.start def receive(self, timeout=5): @@ -207,10 +228,11 @@ class ESSSerializer: logging.debug("Message produced: %s" % (str(msg))) def send(self, proj: Union[YZProjection, TofZProjection]): - if self._active_histogram is None: - proj.clear() - return if isinstance(proj, YZProjection): + if self._active_histogram_yz is None: + proj.clear() + return + suffix = 'YZ' message = HistogramMessage( source='amor-eos', timestamp=ktime(), @@ -238,7 +260,12 @@ class ESSSerializer: "num events": proj.data.cts.sum() }) ) + self._last_message_yz = message elif isinstance(proj, TofZProjection): + if self._active_histogram_tofz is None: + proj.clear() + return + suffix = 'TofZ' message = HistogramMessage( source='amor-eos', timestamp=ktime(), @@ -266,11 +293,11 @@ class ESSSerializer: "num events": proj.data.I.sum() }) ) + self._last_message_tofz = message else: raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented") - self._last_message = message self.producer.produce(value=message.serialize(), - topic=KAFKA_TOPICS['histogram'], + topic=KAFKA_TOPICS['histogram']+'_'+suffix, callback=self.acked) self.producer.flush() diff --git a/eos/nicos.py b/eos/nicos.py new file mode 100644 index 0000000..37ea714 --- /dev/null +++ b/eos/nicos.py @@ -0,0 +1,42 @@ +""" +events2histogram vizualising data from Amor@SINQ, PSI + +Author: Jochen Stahn (algorithms, python draft), + Artur Glavic (structuring and optimisation of code) +""" +import logging + +# need to do absolute import here as pyinstaller requires it +from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig +from eos.command_line import commandLineArgs +from eos.logconfig import setup_logging, update_loglevel + + +def main(): + setup_logging() + logging.getLogger('matplotlib').setLevel(logging.WARNING) + + # read command line arguments and generate classes holding configuration parameters + clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig], + 'events2histogram') + update_loglevel(clas.verbose) + + reader_config = ReaderConfig.from_args(clas) + experiment_config = ExperimentConfig.from_args(clas) + reduction_config = E2HReductionConfig.from_args(clas) + config = E2HConfig(reader_config, experiment_config, reduction_config) + + logging.warning('######## events2histogram - data vizualization for Amor ########') + from eos.reduction_kafka import KafkaReduction + + # only import heavy module if sufficient command line parameters were provided + from eos.reduction_reflectivity import ReflectivityReduction + # Create reducer with these arguments + reducer = KafkaReduction(config) + # Perform actual reduction + reducer.reduce() + + logging.info('######## events2histogram - finished ########') + +if __name__ == '__main__': + main() diff --git a/eos/reduction_kafka.py b/eos/reduction_kafka.py new file mode 100644 index 0000000..dd5d0ae --- /dev/null +++ b/eos/reduction_kafka.py @@ -0,0 +1,125 @@ +""" +Events 2 histogram, quick reduction of single file to display during experiment. +Can be used as a live preview with automatic update when files are modified. +""" + +import logging +import os + +from time import sleep +from .file_reader import AmorEventData, AmorHeader +from .header import Header +from .options import E2HConfig +from . import event_handling as eh, event_analysis as ea +from .path_handling import PathResolver +from .projection import TofZProjection, YZProjection +from .kafka_serializer import ESSSerializer + + +class KafkaReduction: + config: E2HConfig + header: Header + event_actions: eh.EventDataAction + + _last_mtime = 0. + proj_yz: YZProjection + proj_tofz = TofZProjection + + def __init__(self, config: E2HConfig): + self.config = config + + self.header = Header() + + self.prepare_actions() + + def prepare_actions(self): + """ + Does not do any actual reduction. + """ + self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath) + self.current_file = self.path_resolver.resolve('0')[0] + + # Actions on datasets not used for normalization + self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset) + self.event_actions |= eh.CorrectChopperPhase() + self.event_actions |= ea.MergeFrames() + self.event_actions |= eh.ApplyMask() + + def reduce(self): + last_file_header = AmorHeader(self.current_file) + + self.proj_yz = YZProjection() + self.proj_tofz = TofZProjection(last_file_header.timing.tau, foldback=True) + + self.read_data() + self.add_data() + + self.serializer = ESSSerializer() + self.serializer.start_command_thread() + + self.loop() + + + def read_data(self): + self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events) + self.event_actions(self.dataset) + + def add_data(self): + self.monitor = self.dataset.data.pulses.monitor.sum() + self.proj_yz.project(self.dataset, monitor=self.monitor) + self.proj_tofz.project(self.dataset, monitor=self.monitor) + + def replace_dataset(self, latest): + new_file = self.path_resolver.resolve('0')[0] + if not os.path.exists(new_file): + return + try: + # check that events exist in the new file + AmorEventData(new_file, 0, max_events=1000) + except Exception: + logging.debug("Problem when trying to load new dataset", exc_info=True) + return + + logging.warning(f"Preceding to next file {latest}") + self.current_file = new_file + self.read_data() + self.add_data() + + def loop(self): + while True: + try: + self.update() + sleep(1.0) + except KeyboardInterrupt: + self.serializer.end_command_thread() + return + + def update(self): + logging.debug(" check for update") + if self.config.reduction.fileIdentifier=='0': + # if latest file was choosen, check if new one available and switch to it + current = int(os.path.basename(self.current_file)[9:15]) + latest = self.path_resolver.search_latest(0) + if latest>current: + self.replace_dataset(latest) + return + # if all events were read last time, only load more if file was modified + if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime: + return + + self._last_mtime = os.path.getmtime(self.current_file) + try: + update_data = AmorEventData(self.current_file, self.dataset.last_index+1, + max_events=self.config.reduction.max_events) + except EOFError: + return + logging.info(" updating with new data") + + self.event_actions(update_data) + self.dataset=update_data + self.monitor = self.dataset.data.pulses.monitor.sum() + self.proj_yz.project(update_data, self.monitor) + self.proj_tofz.project(update_data, self.monitor) + + self.serializer.send(self.proj_yz) + self.serializer.send(self.proj_tofz)