add nicos daemon for YZ + TofZ histogramming of latest dataset
This commit is contained in:
+46
-19
@@ -130,8 +130,10 @@ class ESSSerializer:
|
||||
"group.id": uuid4(),
|
||||
"default.topic.config": {"auto.offset.reset": "latest"},
|
||||
})
|
||||
self._active_histogram = None
|
||||
self._last_message = None
|
||||
self._active_histogram_yz = None
|
||||
self._active_histogram_tofz = None
|
||||
self._last_message_yz = None
|
||||
self._last_message_tofz = None
|
||||
|
||||
self.consumer.subscribe([KAFKA_TOPICS['command']])
|
||||
|
||||
@@ -156,22 +158,41 @@ class ESSSerializer:
|
||||
value=resp
|
||||
)
|
||||
self.producer.flush()
|
||||
if isinstance(command, Stop) and command.hist_id == self._active_histogram:
|
||||
self._active_histogram = None
|
||||
message = self._last_message
|
||||
message.timestamp = ktime()
|
||||
message.info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'FINISHED',
|
||||
"num events": message.data.sum()
|
||||
})
|
||||
self._last_message = None
|
||||
if isinstance(command, Stop):
|
||||
if command.hist_id == self._active_histogram_yz:
|
||||
suffix = 'YZ'
|
||||
self._active_histogram_yz = None
|
||||
message = self._last_message_yz
|
||||
message.timestamp = ktime()
|
||||
message.info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'FINISHED',
|
||||
"num events": message.data.sum()
|
||||
})
|
||||
self._last_message_yz = None
|
||||
elif command.hist_id == self._active_histogram_tofz:
|
||||
suffix = 'TofZ'
|
||||
self._active_histogram_tofz = None
|
||||
message = self._last_message_tofz
|
||||
message.timestamp = ktime()
|
||||
message.info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'FINISHED',
|
||||
"num events": message.data.sum()
|
||||
})
|
||||
self._last_message_tofz = None
|
||||
else:
|
||||
return
|
||||
self.producer.produce(value=message.serialize(),
|
||||
topic=KAFKA_TOPICS['histogram'],
|
||||
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
|
||||
callback=self.acked)
|
||||
self.producer.flush()
|
||||
elif isinstance(command, ConfigureHistogram):
|
||||
self._active_histogram = command.histograms[0].id
|
||||
for hist in command.histograms:
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_YZ':
|
||||
self._active_histogram_yz = hist.id
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ':
|
||||
self._active_histogram_tofz = hist.id
|
||||
self._start = command.start
|
||||
|
||||
def receive(self, timeout=5):
|
||||
@@ -207,10 +228,11 @@ class ESSSerializer:
|
||||
logging.debug("Message produced: %s" % (str(msg)))
|
||||
|
||||
def send(self, proj: Union[YZProjection, TofZProjection]):
|
||||
if self._active_histogram is None:
|
||||
proj.clear()
|
||||
return
|
||||
if isinstance(proj, YZProjection):
|
||||
if self._active_histogram_yz is None:
|
||||
proj.clear()
|
||||
return
|
||||
suffix = 'YZ'
|
||||
message = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
@@ -238,7 +260,12 @@ class ESSSerializer:
|
||||
"num events": proj.data.cts.sum()
|
||||
})
|
||||
)
|
||||
self._last_message_yz = message
|
||||
elif isinstance(proj, TofZProjection):
|
||||
if self._active_histogram_tofz is None:
|
||||
proj.clear()
|
||||
return
|
||||
suffix = 'TofZ'
|
||||
message = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
@@ -266,11 +293,11 @@ class ESSSerializer:
|
||||
"num events": proj.data.I.sum()
|
||||
})
|
||||
)
|
||||
self._last_message_tofz = message
|
||||
else:
|
||||
raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented")
|
||||
|
||||
self._last_message = message
|
||||
self.producer.produce(value=message.serialize(),
|
||||
topic=KAFKA_TOPICS['histogram'],
|
||||
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
|
||||
callback=self.acked)
|
||||
self.producer.flush()
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
"""
|
||||
events2histogram vizualising data from Amor@SINQ, PSI
|
||||
|
||||
Author: Jochen Stahn (algorithms, python draft),
|
||||
Artur Glavic (structuring and optimisation of code)
|
||||
"""
|
||||
import logging
|
||||
|
||||
# need to do absolute import here as pyinstaller requires it
|
||||
from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig
|
||||
from eos.command_line import commandLineArgs
|
||||
from eos.logconfig import setup_logging, update_loglevel
|
||||
|
||||
|
||||
def main():
|
||||
setup_logging()
|
||||
logging.getLogger('matplotlib').setLevel(logging.WARNING)
|
||||
|
||||
# read command line arguments and generate classes holding configuration parameters
|
||||
clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig],
|
||||
'events2histogram')
|
||||
update_loglevel(clas.verbose)
|
||||
|
||||
reader_config = ReaderConfig.from_args(clas)
|
||||
experiment_config = ExperimentConfig.from_args(clas)
|
||||
reduction_config = E2HReductionConfig.from_args(clas)
|
||||
config = E2HConfig(reader_config, experiment_config, reduction_config)
|
||||
|
||||
logging.warning('######## events2histogram - data vizualization for Amor ########')
|
||||
from eos.reduction_kafka import KafkaReduction
|
||||
|
||||
# only import heavy module if sufficient command line parameters were provided
|
||||
from eos.reduction_reflectivity import ReflectivityReduction
|
||||
# Create reducer with these arguments
|
||||
reducer = KafkaReduction(config)
|
||||
# Perform actual reduction
|
||||
reducer.reduce()
|
||||
|
||||
logging.info('######## events2histogram - finished ########')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,125 @@
|
||||
"""
|
||||
Events 2 histogram, quick reduction of single file to display during experiment.
|
||||
Can be used as a live preview with automatic update when files are modified.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from time import sleep
|
||||
from .file_reader import AmorEventData, AmorHeader
|
||||
from .header import Header
|
||||
from .options import E2HConfig
|
||||
from . import event_handling as eh, event_analysis as ea
|
||||
from .path_handling import PathResolver
|
||||
from .projection import TofZProjection, YZProjection
|
||||
from .kafka_serializer import ESSSerializer
|
||||
|
||||
|
||||
class KafkaReduction:
|
||||
config: E2HConfig
|
||||
header: Header
|
||||
event_actions: eh.EventDataAction
|
||||
|
||||
_last_mtime = 0.
|
||||
proj_yz: YZProjection
|
||||
proj_tofz = TofZProjection
|
||||
|
||||
def __init__(self, config: E2HConfig):
|
||||
self.config = config
|
||||
|
||||
self.header = Header()
|
||||
|
||||
self.prepare_actions()
|
||||
|
||||
def prepare_actions(self):
|
||||
"""
|
||||
Does not do any actual reduction.
|
||||
"""
|
||||
self.path_resolver = PathResolver(self.config.reader.year, self.config.reader.rawPath)
|
||||
self.current_file = self.path_resolver.resolve('0')[0]
|
||||
|
||||
# Actions on datasets not used for normalization
|
||||
self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset)
|
||||
self.event_actions |= eh.CorrectChopperPhase()
|
||||
self.event_actions |= ea.MergeFrames()
|
||||
self.event_actions |= eh.ApplyMask()
|
||||
|
||||
def reduce(self):
|
||||
last_file_header = AmorHeader(self.current_file)
|
||||
|
||||
self.proj_yz = YZProjection()
|
||||
self.proj_tofz = TofZProjection(last_file_header.timing.tau, foldback=True)
|
||||
|
||||
self.read_data()
|
||||
self.add_data()
|
||||
|
||||
self.serializer = ESSSerializer()
|
||||
self.serializer.start_command_thread()
|
||||
|
||||
self.loop()
|
||||
|
||||
|
||||
def read_data(self):
|
||||
self.dataset = AmorEventData(self.current_file, max_events=self.config.reduction.max_events)
|
||||
self.event_actions(self.dataset)
|
||||
|
||||
def add_data(self):
|
||||
self.monitor = self.dataset.data.pulses.monitor.sum()
|
||||
self.proj_yz.project(self.dataset, monitor=self.monitor)
|
||||
self.proj_tofz.project(self.dataset, monitor=self.monitor)
|
||||
|
||||
def replace_dataset(self, latest):
|
||||
new_file = self.path_resolver.resolve('0')[0]
|
||||
if not os.path.exists(new_file):
|
||||
return
|
||||
try:
|
||||
# check that events exist in the new file
|
||||
AmorEventData(new_file, 0, max_events=1000)
|
||||
except Exception:
|
||||
logging.debug("Problem when trying to load new dataset", exc_info=True)
|
||||
return
|
||||
|
||||
logging.warning(f"Preceding to next file {latest}")
|
||||
self.current_file = new_file
|
||||
self.read_data()
|
||||
self.add_data()
|
||||
|
||||
def loop(self):
|
||||
while True:
|
||||
try:
|
||||
self.update()
|
||||
sleep(1.0)
|
||||
except KeyboardInterrupt:
|
||||
self.serializer.end_command_thread()
|
||||
return
|
||||
|
||||
def update(self):
|
||||
logging.debug(" check for update")
|
||||
if self.config.reduction.fileIdentifier=='0':
|
||||
# if latest file was choosen, check if new one available and switch to it
|
||||
current = int(os.path.basename(self.current_file)[9:15])
|
||||
latest = self.path_resolver.search_latest(0)
|
||||
if latest>current:
|
||||
self.replace_dataset(latest)
|
||||
return
|
||||
# if all events were read last time, only load more if file was modified
|
||||
if self.dataset.EOF and os.path.getmtime(self.current_file)<=self._last_mtime:
|
||||
return
|
||||
|
||||
self._last_mtime = os.path.getmtime(self.current_file)
|
||||
try:
|
||||
update_data = AmorEventData(self.current_file, self.dataset.last_index+1,
|
||||
max_events=self.config.reduction.max_events)
|
||||
except EOFError:
|
||||
return
|
||||
logging.info(" updating with new data")
|
||||
|
||||
self.event_actions(update_data)
|
||||
self.dataset=update_data
|
||||
self.monitor = self.dataset.data.pulses.monitor.sum()
|
||||
self.proj_yz.project(update_data, self.monitor)
|
||||
self.proj_tofz.project(update_data, self.monitor)
|
||||
|
||||
self.serializer.send(self.proj_yz)
|
||||
self.serializer.send(self.proj_tofz)
|
||||
Reference in New Issue
Block a user