40 Commits

Author SHA1 Message Date
Jochen Stahn
01873d60db Merge pull request #9 from jochenstahn/new_filewriter
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
New filewriter
2026-02-20 09:47:00 +01:00
654251a194 new version 2026-02-20 09:43:08 +01:00
9e30970d9b another new and everlasting cache path 2026-02-20 09:41:20 +01:00
7b6f2045cc changed topic names, adaption to new hdf format 2026-02-05 09:11:23 +01:00
e3d15a1142 changed topic names 2026-02-02 17:25:29 +01:00
Jochen Stahn
98244327eb Merge pull request #8 from jochenstahn/name
output name made up from sample name, mu and date
2026-01-20 14:07:07 +01:00
61ba1cf084 output name checking for default rather than string in code 2026-01-20 14:01:04 +01:00
54f2169888 changed cache path to /home/amor/nicosdata/cache 2026-01-20 08:08:10 +01:00
24cc0a4287 output name made up from sample name, mu and date 2026-01-09 10:23:35 +01:00
a3d342e5d7 empty model is no longer written to .ort file 2025-12-11 09:12:28 +01:00
c92e4e4d17 Automatic generation of call string from options, fix some defaults 2025-12-02 16:21:39 +01:00
4d74237669 Add readout of magnetic field and temperature to header information 2025-12-02 13:02:59 +01:00
6144200551 Fix raw file entries in header 2025-12-02 12:58:54 +01:00
f2c681ffba Fix wrong squaring of error, as the q-resolution is not an error propagation 2025-12-01 15:28:12 +01:00
a56f29191d Re-write qz projection to work with 0-count norm bins, don't filter norm by counts as default, optional smoothing of norm, fix dQ calculation 2025-12-01 15:11:49 +01:00
6597ca22d1 Bump version for bugfix release
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2025-12-01 09:46:30 +01:00
ce31dcb190 Fix overillumination normalization using gravity corrected angle for data but uncorrected for normalization 2025-12-01 09:44:48 +01:00
41c5218413 fixed output format of orso model 2025-11-25 15:17:31 +01:00
8abd977656 release with -m functionality and better lambda resolution in e2h
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2025-11-19 13:29:49 +01:00
432d85c9b3 release with -m functionality and better lambda resolution in e2h 2025-11-19 13:25:38 +01:00
c222f42a89 screen output format and -m/-mu included in e2h 2025-11-19 13:21:10 +01:00
fbced41b9f Allow expanding the wavelength range in events2histogram
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2025-11-04 15:03:00 +01:00
dfb5aa319f Allow wavelength filtering for raw plots in events2histogram
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2025-11-04 14:24:25 +01:00
c073fae269 update version and add update help file
Some checks failed
Release / build-ubuntu-latest (push) Failing after 2s
Release / build-windows (push) Has been cancelled
Release / release (push) Has been cancelled
2025-10-28 12:40:35 +01:00
447b81d09d Merge branch 'kafka'
# Conflicts:
#	eos/file_reader.py
2025-10-28 12:32:26 +01:00
003a8c04ce Nicos service just to log info by default 2025-10-27 08:12:58 +01:00
4cd25dec3d Recreate projection on counting start accounting for possible changes in tau 2025-10-27 08:08:52 +01:00
b38eeb8eb3 Fix kafka event ToV value and send actual final counts after stop signal 2025-10-23 17:22:48 +02:00
d722228079 Implement live Kafke event processing with Nicos start/stop 2025-10-23 15:40:50 +02:00
493c2bf802 changed quotation marks in split 2025-10-23 10:11:02 +02:00
c429429dd2 minor 2025-10-23 08:27:33 +02:00
0550e725e8 Merge remote-tracking branch 'origin/main' into kafka
# Conflicts:
#	eos/file_reader.py
2025-10-23 08:12:52 +02:00
5c8b9a8cd6 changed logging level and format for file names 2025-10-22 14:59:16 +02:00
4e5d085ad7 Add option to bin tof values, rebuild projections on file change, switch to hs01 serialization and give nicos config help 2025-10-15 10:39:59 +02:00
2d8d1c66c6 Merge remote-tracking branch 'origin/main' into kafka 2025-10-15 09:59:28 +02:00
14ea89e243 Do only clear projections when switching to new file 2025-10-14 15:49:06 +02:00
322c00ca54 add nicos daemon for YZ + TofZ histogramming of latest dataset 2025-10-14 15:28:32 +02:00
0b7a56628f Transfer to NICOS working for YZ and TofZ projections 2025-10-14 14:48:49 +02:00
77641412ab corrected kad to kappa_offset 2025-10-14 14:48:45 +02:00
fc1bd66c3d Start implementing kafka serializer to send image to Nicos, works without control messages for YZ graph 2025-10-13 17:44:52 +02:00
16 changed files with 927 additions and 145 deletions

View File

@@ -2,5 +2,5 @@
Package to handle data redction at AMOR instrument to be used by __main__.py script.
"""
__version__ = '3.0.2'
__date__ = '2025-10-10'
__version__ = '3.1.0'
__date__ = '2026-02-20'

View File

@@ -25,8 +25,15 @@ class ExtractWalltime(EventDataAction):
dataset.data.events = new_events
class MergeFrames(EventDataAction):
def __init__(self, lamdaCut=None):
self.lamdaCut=lamdaCut
def perform_action(self, dataset: EventDatasetProtocol)->None:
tofCut = const.lamdaCut*dataset.geometry.chopperDetectorDistance/const.hdm*1e-13
if self.lamdaCut is None:
lamdaCut = const.lamdaCut
else:
lamdaCut = self.lamdaCut
tofCut = lamdaCut*dataset.geometry.chopperDetectorDistance/const.hdm*1e-13
total_offset = (tofCut +
dataset.timing.tau * (dataset.timing.ch1TriggerPhase + dataset.timing.chopperPhase/2)/180)
dataset.data.events.tof = merge_frames(dataset.data.events.tof, tofCut, dataset.timing.tau, total_offset)

View File

@@ -29,7 +29,7 @@ except ImportError:
AMOR_LOCAL_TIMEZONE = zoneinfo.ZoneInfo(key='Europe/Zurich')
if platform.node().startswith('amor'):
NICOS_CACHE_DIR = '/home/amor/nicosdata/amor/cache/'
NICOS_CACHE_DIR = '/home/data/nicosdata/cache/'
GREP = '/usr/bin/grep "value"'
else:
NICOS_CACHE_DIR = None
@@ -41,7 +41,7 @@ class AmorHeader:
def __init__(self, fileName:Union[str, h5py.File, BinaryIO]):
if type(fileName) is str:
logging.debug(f' opening file {fileName}')
logging.warning(f' {fileName.split("/")[-1]}')
self.hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:
self.hdf = fileName
@@ -74,25 +74,32 @@ class AmorHeader:
logging.warning(f" parameter {key} not found, relpace by zero")
return dtype(0)
def get_hdf_single_entry(self, path):
if not np.shape(self.hdf['entry1/title']):
return self.hdf[path][()].decode('utf-8')
else:
# format until 2025
return self.hdf[path][0].decode('utf-8')
def read_header_info(self):
# read general information and first data set
title = self.hdf['entry1/title'][0].decode('utf-8')
proposal_id = self.hdf['entry1/proposal_id'][0].decode('utf-8')
user_name = self.hdf['entry1/user/name'][0].decode('utf-8')
title = self.get_hdf_single_entry('entry1/title')
proposal_id = self.get_hdf_single_entry('entry1/proposal_id')
user_name = self.get_hdf_single_entry('entry1/user/name')
user_affiliation = 'unknown'
user_email = self.hdf['entry1/user/email'][0].decode('utf-8')
user_email = self.get_hdf_single_entry('entry1/user/email')
user_orcid = None
sampleName = self.hdf['entry1/sample/name'][0].decode('utf-8')
model = self.hdf['entry1/sample/model'][0].decode('utf-8')
if 'stack:' in model:
sampleName = self.get_hdf_single_entry('entry1/sample/name')
instrumentName = 'Amor'
source = self.get_hdf_single_entry('entry1/Amor/source/name')
sourceProbe = 'neutron'
model = self.get_hdf_single_entry('entry1/sample/model')
if 'stack' in model:
import yaml
model = yaml.safe_load(model)
else:
model = dict(stack=model)
instrumentName = 'Amor'
source = self.hdf['entry1/Amor/source/name'][0].decode('utf-8')
sourceProbe = 'neutron'
start_time = self.hdf['entry1/start_time'][0].decode('utf-8')
start_time = self.get_hdf_single_entry('entry1/start_time')
# extract start time as unix time, adding UTC offset of 1h to time string
start_date = datetime.fromisoformat(start_time)
self.fileDate = start_date.replace(tzinfo=AMOR_LOCAL_TIMEZONE)
@@ -113,11 +120,23 @@ class AmorHeader:
facility=source,
proposalID=proposal_id
)
if model['stack'] == '':
om = None
else:
om = SampleModel.from_dict(model)
self.sample = fileio.Sample(
name=sampleName,
model=SampleModel.from_dict(model),
sample_parameters=None,
model=om,
sample_parameters={},
)
# while event times are not evaluated, use average_value reported in file for SEE
if self.hdf['entry1/sample'].get('temperature', None) is not None \
and float(self.hdf['entry1/sample/temperature/average_value'][0])>0:
self.sample.sample_parameters['temperature'] = fileio.Value(
float(self.hdf['entry1/sample/temperature/average_value'][0]), unit='K')
if self.hdf['entry1/sample'].get('magnetic_field', None) is not None:
self.sample.sample_parameters['magnetic_field'] = fileio.Value(
float(self.hdf['entry1/sample/magnetic_field/average_value'][0]), unit='T')
def read_instrument_configuration(self):
chopperSeparation = float(np.take(self.hdf['entry1/Amor/chopper/pair_separation'], 0))
@@ -129,7 +148,7 @@ class AmorHeader:
mu = self._replace_if_missing('instrument_control_parameters/mu', 'mu', float)
nu = self._replace_if_missing('instrument_control_parameters/nu', 'nu', float)
kap = self._replace_if_missing('instrument_control_parameters/kappa', 'kappa', float)
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kad', float)
kad = self._replace_if_missing('instrument_control_parameters/kappa_offset', 'kappa_offset', float)
div = self._replace_if_missing('instrument_control_parameters/div', 'div', float)
ch1TriggerPhase = self._replace_if_missing('chopper/ch1_trigger_phase', 'ch1_trigger_phase', float)
ch2TriggerPhase = self._replace_if_missing('chopper/ch2_trigger_phase', 'ch2_trigger_phase', float)
@@ -163,9 +182,11 @@ class AmorHeader:
round(mu+kap+kad+0.5*div, 3),
'deg'),
wavelength = fileio.ValueRange(const.lamdaCut, const.lamdaMax, 'angstrom'),
#polarization = fileio.Polarization.unpolarized,
polarization = fileio.Polarization(polarizationConfig)
)
self.instrument_settings.qz = fileio.ValueRange(round(4*np.pi*np.sin(np.deg2rad(mu+kap+kad-0.5*div))/const.lamdaMax, 3),
round(4*np.pi*np.sin(np.deg2rad(mu+kap+kad+0.5*div))/const.lamdaCut, 3),
'1/angstrom')
self.instrument_settings.mu = fileio.Value(
round(mu, 3),
'deg',
@@ -223,7 +244,7 @@ class AmorEventData(AmorHeader):
def __init__(self, fileName:Union[str, h5py.File, BinaryIO], first_index:int=0, max_events:int=100_000_000):
if type(fileName) is str:
logging.debug(f' opening file {fileName}')
logging.warning(f' {fileName.split("/")[-1]}')
self.file_list = [fileName]
hdf = h5py.File(fileName, 'r', swmr=True)
elif type(fileName) is h5py.File:

View File

@@ -65,9 +65,14 @@ class LZGrid:
def qzRange(self):
return self._qzRange
def __init__(self, qResolution, qzRange):
def __init__(self, qResolution, qzRange, lambda_overwrite=None):
self._qResolution = qResolution
self._qzRange = qzRange
if lambda_overwrite is None:
self.lamdaMax = const.lamdaMax
self.lamdaCut = const.lamdaCut
else:
self.lamdaCut, self.lamdaMax = lambda_overwrite
@property
@cache
@@ -92,8 +97,8 @@ class LZGrid:
@cache
def lamda(self):
lamdaMax = 16
lamdaMin = const.lamdaCut
lamdaMax = self.lamdaMax
lamdaMin = self.lamdaCut
lamda_grid = lamdaMin*(1+self.dldl)**np.arange(int(np.log(lamdaMax/lamdaMin)/np.log(1+self.dldl)+1))
return lamda_grid

165
eos/kafka_events.py Normal file
View File

@@ -0,0 +1,165 @@
"""
Collect AMOR detector events send via Kafka.
"""
import logging
import numpy as np
from threading import Thread, Event
from time import time
from .event_data_types import AmorGeometry, AmorTiming, AmorEventStream, PACKET_TYPE, EVENT_TYPE, PULSE_TYPE, PC_TYPE
from uuid import uuid4
from streaming_data_types.eventdata_ev44 import EventData
from streaming_data_types.logdata_f144 import ExtractedLogData
from streaming_data_types import deserialise_f144, deserialise_ev44
from confluent_kafka import Consumer
from .header import Header
try:
from streaming_data_types.utils import get_schema
except ImportError:
from streaming_data_types.utils import _get_schema as get_schema
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
AMOR_EVENTS = 'amor_detector'
AMOR_NICOS = 'amor_nicosForwarder'
class KafkaFrozenData:
"""
Represents event stream data from Kafka at a given time.
Will be returned by KafkaEventData to be use in conjunction
with data processing and projections.
Implements EventDatasetProtocol
"""
geometry: AmorGeometry
timing: AmorTiming
data: AmorEventStream
def __init__(self, geometry, timing, data, monitor=1.):
self.geometry = geometry
self.timing = timing
self.data = data
self.monitor = monitor
def append(self, other):
raise NotImplementedError("can't append live datastream to other event data")
def update_header(self, header:Header):
# maybe makes sense later, but for now just used for live vizualization
...
class KafkaEventData(Thread):
"""
Read Nicos information and events from Kafka. Creates a background
thread that listens to Kafka events and converts them to eos compatible information.
"""
geometry: AmorGeometry
timing: AmorTiming
events: np.recarray
def __init__(self):
self.stop_event = Event()
self.stop_counting = Event()
self.new_events = Event()
self.last_read = 0
self.last_read_time = 0.
self.start_time = time()
self.consumer = Consumer(
{'bootstrap.servers': 'linkafka01.psi.ch:9092',
'group.id': uuid4()})
self.consumer.subscribe([AMOR_EVENTS, AMOR_NICOS])
self.geometry = AmorGeometry(1.0, 2.0, 0., 0., 1.5, 10.0, 4.0, 10.0)
self.timing = AmorTiming(0., 0., 500., 0., 30./500.)
# create empty dataset
self.events = np.recarray(0, dtype=EVENT_TYPE)
super().__init__()
def run(self):
while not self.stop_event.is_set():
messages = self.consumer.consume(10, timeout=1)
for message in messages:
self.process_message(message)
def process_message(self, message):
if message.error():
logging.info(f" received Kafka message with error: {message.error()}")
return
schema = get_schema(message.value())
if message.topic()==AMOR_EVENTS and schema=='ev44':
events:EventData = deserialise_ev44(message.value())
self.add_events(events)
self.new_events.set()
logging.debug(f' new events {events}')
elif message.topic()==AMOR_NICOS and schema=='f144':
nicos_data:ExtractedLogData = deserialise_f144(message.value())
if nicos_data.source_name in self.nicos_mapping.keys():
logging.debug(f' {nicos_data.source_name} = {nicos_data.value}')
self.update_instrument(nicos_data)
def add_events(self, events:EventData):
"""
Add new events to the Dataset. The object keeps raw events
and only copies the latest set to the self.data object,
this allows to run the event processing to be performed on a "clean"
evnet stream each time.
"""
if self.stop_counting.is_set():
return
prev_size = self.events.shape[0]
new_events = events.pixel_id.shape[0]
self.events.resize(prev_size+new_events, refcheck=False)
self.events.pixelID[prev_size:] = events.pixel_id
self.events.mask[prev_size:] = 0
self.events.tof[prev_size:] = events.time_of_flight/1.e9
nicos_mapping = {
'mu': ('geometry', 'mu'),
'nu': ('geometry', 'nu'),
'kappa': ('geometry', 'kap'),
'kappa_offset': ('geometry', 'kad'),
'ch1_trigger_phase': ('timing', 'ch1TriggerPhase'),
'ch2_trigger_phase': ('timing', 'ch2TriggerPhase'),
'ch2_speed': ('timing', 'chopperSpeed'),
'chopper_phase': ('timing', 'chopperPhase'),
}
def update_instrument(self, nicos_data:ExtractedLogData):
if nicos_data.source_name in self.nicos_mapping:
attr, subattr = self.nicos_mapping[nicos_data.source_name]
setattr(getattr(self, attr), subattr, nicos_data.value)
if nicos_data.source_name=='ch2_speed':
self.timing.tau = 30./self.timing.chopperSpeed
def monitor(self):
return time()-self.start_time
def restart(self):
# empty event buffer
self.events = np.recarray(0, dtype=EVENT_TYPE)
self.stop_counting.clear()
self.last_read = 0
self.start_time = time()
self.new_events.clear()
def get_events(self, total_counts=False):
packets = np.recarray(0, dtype=PACKET_TYPE)
pulses = np.recarray(0, dtype=PULSE_TYPE)
pc = np.recarray(0, dtype=PC_TYPE)
if total_counts:
last_read = 0
else:
last_read = self.last_read
if last_read>=self.events.shape[0]:
raise EOFError("No new events arrived")
data = AmorEventStream(self.events[last_read:].copy(), packets, pulses, pc)
self.last_read = self.events.shape[0]
self.new_events.clear()
t_now = time()
monitor = t_now-self.last_read_time
self.last_read_time = t_now
return KafkaFrozenData(self.geometry, self.timing, data, monitor=monitor)

283
eos/kafka_serializer.py Normal file
View File

@@ -0,0 +1,283 @@
"""
Allows to send eos projections to Kafka using ESS histogram serialization.
For histogram_h01 the message is build using:
hist = {
"source": "some_source",
"timestamp": 123456,
"current_shape": [2, 5],
"dim_metadata": [
{
"length": 2,
"unit": "a",
"label": "x",
"bin_boundaries": np.array([10, 11, 12]),
},
{
"length": 5,
"unit": "b",
"label": "y",
"bin_boundaries": np.array([0, 1, 2, 3, 4, 5]),
},
],
"last_metadata_timestamp": 123456,
"data": np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]),
"errors": np.array([[5, 4, 3, 2, 1], [10, 9, 8, 7, 6]]),
"info": "info_string",
}
"""
import logging
from typing import List, Tuple, Union
from threading import Thread, Event
import numpy as np
import json
from time import time
from dataclasses import dataclass, asdict
from streaming_data_types import histogram_hs01
from confluent_kafka import Producer, Consumer
from uuid import uuid4
from .projection import TofZProjection, YZProjection
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
KAFKA_TOPICS = {
'histogram': 'amor_histograms',
'response': 'amor_histResponse',
'command': 'amor_histCommands'
}
def ktime():
return int(time()*1_000)
@dataclass
class DimMetadata:
length: int
unit: str
label: str
bin_boundaries: np.ndarray
@dataclass
class HistogramMessage:
source: str
timestamp: int
current_shape: Tuple[int, int]
dim_metadata: Tuple[DimMetadata, DimMetadata]
last_metadata_timestamp: int
data: np.ndarray
errors: np.ndarray
info: str
def serialize(self):
return histogram_hs01.serialise_hs01(asdict(self))
@dataclass
class CommandMessage:
msg_id: str
cmd=None
@classmethod
def get_message(cls, data):
"""
Uses the sub-class cmd attribute to select which message to retugn
"""
msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()])
return msg[data['cmd']](**data)
@dataclass
class Stop(CommandMessage):
hist_id: str
id: str
cmd:str = 'stop'
@dataclass
class HistogramConfig:
id: str
type: str
data_brokers: List[str]
topic: str
data_topics: List[str]
tof_range: Tuple[float, float]
det_range: Tuple[int, int]
num_bins: int
width: int
height: int
left_edges: list
source: str
@dataclass
class ConfigureHistogram(CommandMessage):
histograms: List[HistogramConfig]
start: int
cmd:str = 'config'
def __post_init__(self):
self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms]
class ESSSerializer:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': KAFKA_BROKER,
'message.max.bytes': 4_000_000,
})
self.consumer = Consumer({
'bootstrap.servers': KAFKA_BROKER,
"group.id": uuid4(),
"default.topic.config": {"auto.offset.reset": "latest"},
})
self._active_histogram_yz = None
self._active_histogram_tofz = None
self.new_count_started = Event()
self.count_stopped = Event()
self.consumer.subscribe([KAFKA_TOPICS['command']])
def process_message(self, message):
if message.error():
logging.error("Command Consumer Error: %s", message.error())
else:
command = json.loads(message.value().decode())
try:
command = CommandMessage.get_message(command)
except Exception:
logging.error(f'Could not interpret message: \n{command}', exc_info=True)
return
logging.info(command)
resp = json.dumps({
"msg_id": getattr(command, "id", None) or command.msg_id,
"response": "ACK",
"message": ""
})
self.producer.produce(
topic=KAFKA_TOPICS['response'],
value=resp
)
self.producer.flush()
if isinstance(command, Stop):
if command.hist_id == self._active_histogram_yz:
self.count_stopped.set()
else:
return
elif isinstance(command, ConfigureHistogram):
for hist in command.histograms:
if hist.topic == KAFKA_TOPICS['histogram']+'_YZ':
self._active_histogram_yz = hist.id
logging.debug(f" histogram data_topic: {hist.data_topics}")
self._start = command.start
self.count_stopped.clear()
self.new_count_started.set()
if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ':
self._active_histogram_tofz = hist.id
def receive(self, timeout=5):
rec = self.consumer.poll(timeout)
if rec is not None:
self.process_message(rec)
return True
else:
return False
def receive_loop(self):
while not self._stop_receiving.is_set():
try:
self.receive()
except Exception:
logging.error("Exception while receiving", exc_info=True)
def start_command_thread(self):
self._stop_receiving = Event()
self._command_thread = Thread(target=self.receive_loop)
self._command_thread.start()
def end_command_thread(self, event=None):
self._stop_receiving.set()
self._command_thread.join()
def acked(self, err, msg):
# We need to have callback to produce-method to catch server errors
if err is not None:
logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
logging.debug("Message produced: %s" % (str(msg)))
def send(self, proj: Union[YZProjection, TofZProjection], final=False):
if final:
state = 'FINISHED'
else:
state = 'COUNTING'
if isinstance(proj, YZProjection):
if self._active_histogram_yz is None:
return
suffix = 'YZ'
message = HistogramMessage(
source='amor-eos',
timestamp=ktime(),
current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1),
dim_metadata=(
DimMetadata(
length=proj.y.shape[0]-1,
unit="pixel",
label="Y",
bin_boundaries=proj.y,
),
DimMetadata(
length=proj.z.shape[0]-1,
unit="pixel",
label="Z",
bin_boundaries=proj.z,
)
),
last_metadata_timestamp=0,
data=proj.data.cts,
errors=np.sqrt(proj.data.cts),
info=json.dumps({
"start": self._start,
"state": state,
"num events": proj.data.cts.sum()
})
)
logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos")
elif isinstance(proj, TofZProjection):
if self._active_histogram_tofz is None:
return
suffix = 'TofZ'
message = HistogramMessage(
source='amor-eos',
timestamp=ktime(),
current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1),
dim_metadata=(
DimMetadata(
length=proj.tof.shape[0]-1,
unit="ms",
label="ToF",
bin_boundaries=proj.tof,
),
DimMetadata(
length=proj.z.shape[0]-1,
unit="pixel",
label="Z",
bin_boundaries=proj.z,
),
),
last_metadata_timestamp=0,
data=proj.data.cts,
errors=np.sqrt(proj.data.cts),
info=json.dumps({
"start": self._start,
"state": state,
"num events": proj.data.I.sum()
})
)
else:
raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented")
self.producer.produce(value=message.serialize(),
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
callback=self.acked)
self.producer.flush()

46
eos/nicos.py Normal file
View File

@@ -0,0 +1,46 @@
"""
events2histogram vizualising data from Amor@SINQ, PSI
Author: Jochen Stahn (algorithms, python draft),
Artur Glavic (structuring and optimisation of code)
"""
import logging
# need to do absolute import here as pyinstaller requires it
from eos.options import E2HConfig, ReaderConfig, ExperimentConfig, E2HReductionConfig
from eos.command_line import commandLineArgs
from eos.logconfig import setup_logging, update_loglevel
def main():
setup_logging()
logging.getLogger('matplotlib').setLevel(logging.WARNING)
# read command line arguments and generate classes holding configuration parameters
clas = commandLineArgs([ReaderConfig, ExperimentConfig, E2HReductionConfig],
'amor-nicos')
update_loglevel(clas.verbose)
if clas.verbose<2:
# only log info level in logfile
logger = logging.getLogger() # logging.getLogger('quicknxs')
logger.setLevel(logging.INFO)
reader_config = ReaderConfig.from_args(clas)
experiment_config = ExperimentConfig.from_args(clas)
reduction_config = E2HReductionConfig.from_args(clas)
config = E2HConfig(reader_config, experiment_config, reduction_config)
logging.warning('######## amor-nicos - Nicos histogram for Amor ########')
from eos.reduction_kafka import KafkaReduction
# only import heavy module if sufficient command line parameters were provided
from eos.reduction_reflectivity import ReflectivityReduction
# Create reducer with these arguments
reducer = KafkaReduction(config)
# Perform actual reduction
reducer.reduce()
logging.info('######## amor-nicos - finished ########')
if __name__ == '__main__':
main()

View File

@@ -6,6 +6,8 @@ import os
import numpy as np
from typing import List, Optional
from orsopy import fileio
from .event_data_types import EventDatasetProtocol
from .header import Header
from .options import NormalisationMethod
@@ -24,7 +26,6 @@ class LZNormalisation:
detZ_e = reference.data.events.detZ
self.monitor = np.sum(reference.data.pulses.monitor)
norm_lz, _, _ = np.histogram2d(lamda_e, detZ_e, bins=(grid.lamda(), grid.z()))
norm_lz = np.where(norm_lz>2, norm_lz, np.nan)
if normalisationMethod==NormalisationMethod.direct_beam:
self.norm = np.flip(norm_lz, 1)
else:
@@ -47,7 +48,7 @@ class LZNormalisation:
self = super().__new__(cls)
with open(filename, 'rb') as fh:
hash = str(np.load(fh, allow_pickle=True))
self.file_list = np.load(fh, allow_pickle=True)
self.file_list = np.load(fh, allow_pickle=True).tolist()
self.angle = np.load(fh, allow_pickle=True)
self.norm = np.load(fh, allow_pickle=True)
self.monitor = np.load(fh, allow_pickle=True)
@@ -99,4 +100,33 @@ class LZNormalisation:
np.save(fh, self.monitor, allow_pickle=False)
def update_header(self, header:Header):
header.measurement_additional_files = self.file_list
header.measurement_additional_files = [fileio.File(file=os.path.basename(entry)) for entry in self.file_list]
def smooth(self, width):
logging.debug(f'apply convolution with gaussian along lambda axis to smooth norm, sigma={width}')
try:
from scipy.signal import fftconvolve
except ImportError:
self._smooth_slow(width)
kx = np.arange(self.norm.shape[0])-self.norm.shape[0]/2.
kernel = np.exp(-0.5*kx**2/width**2)
kernel/=kernel.sum()
kernel = kernel[:, np.newaxis]*np.ones(self.norm.shape[1])[np.newaxis, :]
unorm = np.where(np.isnan(self.norm), 0., self.norm)
nnorm = fftconvolve(unorm, kernel, mode='same', axes=0)
nnorm[np.isnan(self.norm)] = np.nan
self.norm = nnorm
def _smooth_slow(self, width):
# like smooth but using numpy buildin slow convolve
nnorm = np.zeros_like(self.norm)
kx = np.arange(self.norm.shape[0])-self.norm.shape[0]/2.
kernel = np.exp(-0.5*kx**2/width**2)
kernel/=kernel.sum()
unorm = np.where(np.isnan(self.norm), 0., self.norm)
for row in range(self.norm.shape[1]):
nnorm[:, row] = np.convolve(unorm[:, row], kernel, mode='same')
nnorm[np.isnan(self.norm)] = np.nan
self.norm = nnorm

View File

@@ -21,6 +21,11 @@ except ImportError:
# python <3.10 use Enum instead
from enum import Enum as StrEnum
class InCallString(StrEnum):
auto='auto'
always='always'
never='never'
@dataclass
class CommandlineParameterConfig:
argument: str # default parameter for command line resutign ins "--argument"
@@ -28,6 +33,7 @@ class CommandlineParameterConfig:
short_form: Optional[str] = None
group: str = 'misc'
priority: int = 0
in_call_string: InCallString = InCallString.auto
def __gt__(self, other):
"""
@@ -90,6 +96,7 @@ class ArgParsable:
typ = field.type
if get_origin(typ) is list:
args['nargs'] = '+'
args['action'] = 'extend'
typ = get_args(typ)[0]
if get_origin(typ) is tuple:
# tuple of items are put together during evaluation
@@ -117,6 +124,7 @@ class ArgParsable:
group=field.metadata.get('group', 'misc'),
short_form=field.metadata.get('short', None),
priority=field.metadata.get('priority', 0),
in_call_string=field.metadata.get('in_call_string', InCallString.auto),
))
return output
@@ -168,6 +176,34 @@ class ArgParsable:
inpargs[field.name] = value
return cls(**inpargs)
def get_call_parameters(self, abbrv=True):
"""
Return a list of command line arguments that reproduce this config, do not add default parameters.
"""
output = []
for arg in sorted(self.get_commandline_parameters()):
if ((arg.in_call_string==InCallString.auto and self.is_default(arg.argument)) or
arg.in_call_string==InCallString.never):
# skip default arguments or arguments defined to never appear in call string
continue
if arg.short_form and abbrv:
item = '-' + arg.short_form + ' '
else:
item = '--' + arg.argument + ' '
if arg.add_argument_args.get('type', None) in [str, float, int]:
nargs = arg.add_argument_args.get('nargs', None)
if nargs is None:
item += str(getattr(self, arg.argument))
elif nargs=='+':
# remove the default parameters, only show added ones
ignore = len(arg.add_argument_args.get('default', []))
item += ' '.join([str(pi) for pi in getattr(self, arg.argument)[ignore:]])
else:
item += ' '.join([str(pi) for pi in getattr(self, arg.argument)])
# boolean flags only reach this point if they are non-default
output.append((arg, item))
return output
# definition of command line arguments
@dataclass
@@ -178,6 +214,7 @@ class ReaderConfig(ArgParsable):
'short': 'Y',
'group': 'input data',
'help': 'year the measurement was performed',
'in_call_string': InCallString.always,
},
)
rawPath: List[str] = field(
@@ -302,7 +339,7 @@ class ExperimentConfig(ArgParsable):
},
)
muOffset: Optional[float] = field(
default=0,
default=None,
metadata={
'short': 'm',
'group': 'sample',
@@ -381,6 +418,21 @@ class ReflectivityReductionConfig(ArgParsable):
'priority': 90,
'group': 'input data',
'help': 'normalisation method: [o]verillumination, [u]nderillumination, [d]irect_beam'})
normalizationFilter: float = field(
default=-1,
metadata={
'group': 'input data',
'help': 'minimum normalization counts in lambda-theta bin to use, else filter'})
normAngleFilter: float = field(
default=-1,
metadata={
'group': 'input data',
'help': 'minimum normalization counts total thetat bin to use, else filter'})
normalizationSmoothing: float = field(
default=0,
metadata={
'group': 'input data',
'help': 'apply convolution on lambda axes to smooth the normalization data, useful for low statistics'})
scale: List[float] = field(
default_factory=lambda: [1.],
metadata={
@@ -404,7 +456,7 @@ class ReflectivityReductionConfig(ArgParsable):
'group': 'input data',
'help': 'File with R(q_z) curve to be subtracted (in .Rqz.ort format)'})
normalisationFileIdentifier: Optional[List[str]] = field(
default_factory=lambda: [None],
default=None,
metadata={
'short': 'n',
'priority': 90,
@@ -526,85 +578,19 @@ class ReflectivityConfig:
def call_string(self):
base = 'eos'
inpt = ''
if self.reader.year:
inpt += f' -Y {self.reader.year}'
else:
inpt += f' -Y {datetime.now().year}'
if np.shape(self.reader.rawPath)[0] == 1:
inpt += f' --rawPath {self.reader.rawPath}'
if self.reduction.subtract:
inpt += f' -subtract {self.reduction.subtract}'
if self.reduction.normalisationFileIdentifier:
inpt += f' -n {" ".join(self.reduction.normalisationFileIdentifier)}'
if self.reduction.fileIdentifier:
inpt += f' -f {" ".join(self.reduction.fileIdentifier)}'
otpt = ''
if self.reduction.qResolution:
otpt += f' -r {self.reduction.qResolution}'
if self.output.outputPath != '.':
inpt += f' --outputdPath {self.output.outputPath}'
if self.output.outputName:
otpt += f' -o {self.output.outputName}'
if self.output.outputFormats != ['Rqz.ort']:
otpt += f' -of {" ".join(self.output.outputFormats)}'
mask = ''
call_parameters = self.reader.get_call_parameters()
call_parameters += self.output.get_call_parameters()
call_parameters += self.reduction.get_call_parameters()
call_parameters += self.experiment.get_call_parameters()
mask += f' -y {" ".join(str(ii) for ii in self.experiment.yRange)}'
mask += f' -l {" ".join(str(ff) for ff in self.experiment.lambdaRange)}'
mask += f' -t {" ".join(str(ff) for ff in self.reduction.thetaRange)}'
mask += f' -T {" ".join(str(ff) for ff in self.reduction.thetaRangeR)}'
mask += f' -q {" ".join(str(ff) for ff in self.reduction.qzRange)}'
call_parameters.sort()
para = ''
# TODO: Check if we want these parameters for defaults
para += f' --chopperPhase {self.experiment.chopperPhase}'
para += f' --chopperPhaseOffset {self.experiment.chopperPhaseOffset}'
if self.experiment.mu:
para += f' --mu {self.experiment.mu}'
elif self.experiment.muOffset:
para += f' --muOffset {self.experiment.muOffset}'
if self.experiment.nu:
para += f' --nu {self.experiment.nu}'
cpout = f'{base} ' + ' '.join([cp[1] for cp in call_parameters])
modl = ''
if self.experiment.sampleModel:
modl += f" --sampleModel '{self.experiment.sampleModel}'"
acts = ''
if self.reduction.autoscale:
acts += f' --autoscale {" ".join(str(ff) for ff in self.reduction.autoscale)}'
# TODO: Check if should be shown if not default
acts += f' --scale {self.reduction.scale}'
if self.reduction.timeSlize:
acts += f' --timeSlize {" ".join(str(ff) for ff in self.reduction.timeSlize)}'
mlst = base + inpt + otpt
if mask:
mlst += mask
if para:
mlst += para
if acts:
mlst += acts
if modl:
mlst += modl
if len(mlst) > 70:
mlst = base + ' ' + inpt + ' ' + otpt
if mask:
mlst += ' ' + mask
if para:
mlst += ' ' + para
if acts:
mlst += ' ' + acts
if modl:
mlst += ' ' + modl
logging.debug(f'Argument list build in EOSConfig.call_string: {mlst}')
return mlst
logging.debug(f'Argument list build in EOSConfig.call_string: {cpout}')
return cpout
class E2HPlotSelection(StrEnum):
All = 'all'
@@ -655,6 +641,14 @@ class E2HReductionConfig(ArgParsable):
},
)
kafka: bool = field(
default=False,
metadata={
'group': 'output',
'help': 'send result to kafka for Nicos',
},
)
plotArgs: E2HPlotArguments = field(
default=E2HPlotArguments.Default,
metadata={

View File

@@ -85,12 +85,19 @@ class ProjectedReflectivity:
self.R -= R
self.dR = np.sqrt(self.dR**2+dR**2)
def plot(self, **kwargs):
from matplotlib import pyplot as plt
plt.errorbar(self.Q, self.R, yerr=self.dR, **kwargs)
plt.yscale('log')
plt.xlabel('Q / Å$^{-1}$')
plt.ylabel('R')
class LZProjection(ProjectionInterface):
grid: LZGrid
lamda: np.ndarray
alphaF: np.ndarray
is_normalized: bool
angle: float
data: np.recarray
_dtype = np.dtype([
@@ -107,6 +114,7 @@ class LZProjection(ProjectionInterface):
def __init__(self, tthh: float, grid: LZGrid):
self.grid = grid
self.is_normalized = False
self.angle = tthh
alphaF_z = tthh + Detector.delta_z
lamda_l = self.grid.lamda()
@@ -168,9 +176,12 @@ class LZProjection(ProjectionInterface):
self.data.mask &= self.lamda>=lamda_range[0]
self.data.mask &= self.lamda<=lamda_range[1]
def apply_norm_mask(self, norm: LZNormalisation):
def apply_norm_mask(self, norm: LZNormalisation, min_norm=-1, min_theta=-1):
# Mask points where normliazation is nan
self.data.mask &= np.logical_not(np.isnan(norm.norm))
self.data.mask &= np.logical_not(np.isnan(norm.norm))&(norm.norm>min_norm)
if min_theta>0:
thsum = np.nansum(norm.norm, axis=0)
self.data.mask &= (thsum>min_theta)[np.newaxis, :]
def project(self, dataset: EventDatasetProtocol, monitor: float):
"""
@@ -190,6 +201,7 @@ class LZProjection(ProjectionInterface):
self.data[:] = 0
self.data.mask = True
self.monitor = 0.
self.norm_monitor = 1.
@property
def I(self):
@@ -199,22 +211,25 @@ class LZProjection(ProjectionInterface):
def calc_error(self):
# calculate error bars for resulting intensity after normalization
self.data.err = self.data.ref * np.sqrt( 1./(self.data.I+.1) + 1./self.data.norm )
self.data.err = self.data.ref * np.sqrt( 1./(self.data.I+.1) + 1./(self.data.norm+0.1) )
def normalize_over_illuminated(self, norm: LZNormalisation):
"""
Normalize the dataaset and take into account a difference in
detector angle for measurement and reference.
"""
logging.debug(f' correcting for incident angle difference from norm {norm.angle} to data {self.angle}')
norm_lz = norm.norm
thetaN_z = Detector.delta_z+norm.angle
thetaN_lz = np.ones_like(norm_lz)*thetaN_z
thetaN_lz = np.where(np.absolute(thetaN_lz)>5e-3, thetaN_lz, np.nan)
self.data.mask &= (np.absolute(thetaN_lz)>5e-3)
ref_lz = (self.data.I*np.absolute(thetaN_lz))/(norm_lz*np.absolute(self.alphaF))
delta_lz = np.ones_like(norm_lz)*Detector.delta_z
# do not perform gravity correction for footprint, would require norm detector distance that is unknown here
fp_corr_lz = np.where(np.absolute(delta_lz+norm.angle)>5e-3,
(delta_lz+self.angle)/(delta_lz+norm.angle), np.nan)
self.data.mask &= np.logical_not(np.isnan(fp_corr_lz))
self.data.norm = norm_lz*fp_corr_lz
self.norm_monitor = norm.monitor
ref_lz = self.data.I/np.where(self.data.norm>0, self.data.norm, np.nan)
ref_lz *= norm.monitor/self.monitor
ref_lz[np.logical_not(self.data.mask)] = np.nan
self.data.norm = norm_lz
self.data.ref = ref_lz
self.calc_error()
self.is_normalized = True
@@ -234,6 +249,7 @@ class LZProjection(ProjectionInterface):
raise ValueError("Dataset needs to be normalized, first")
self.data.ref *= factor
self.data.err *= factor
self.norm_monitor /= factor
def project_on_qz(self):
if not self.is_normalized:
@@ -241,29 +257,25 @@ class LZProjection(ProjectionInterface):
q_q = self.grid.q()
weights_lzf = self.data.norm[self.data.mask]
q_lzf = self.data.qz[self.data.mask]
R_lzf = self.data.ref[self.data.mask]
dR_lzf = self.data.err[self.data.mask]
I_lzf = self.data.I[self.data.mask]
dq_lzf = self.data.res[self.data.mask]
N_q = np.histogram(q_lzf, bins = q_q, weights = weights_lzf )[0]
# get number of grid points contributing to a bin, filter points with no contribution
N_q = np.histogram(q_lzf, bins = q_q)[0]
N_q = np.where(N_q > 0, N_q, np.nan)
fltr = N_q>0
R_q = np.histogram(q_lzf, bins = q_q, weights = weights_lzf * R_lzf )[0]
R_q = R_q / N_q
dR_q = np.histogram(q_lzf, bins = q_q, weights = (weights_lzf * dR_lzf)**2 )[0]
dR_q = np.sqrt( dR_q ) / N_q
# TODO: different error propagations for dR and dq!
# this is what should work:
#dq_q = np.histogram(q_lzf, bins = q_q, weights = (weights_lzf * dq_lzf)**2 )[0]
#dq_q = np.sqrt( dq_q ) / N_q
# and this actually works:
N_q = np.histogram(q_lzf, bins = q_q, weights = weights_lzf**2 )[0]
N_q = np.where(N_q > 0, N_q, np.nan)
dq_q = np.histogram(q_lzf, bins = q_q, weights = (weights_lzf * dq_lzf)**2 )[0]
dq_q = np.sqrt( dq_q / N_q )
# calculate sum of all normalization weights per bin
W_q = np.maximum(np.histogram(q_lzf, bins = q_q, weights = weights_lzf)[0], 1e-10)
# calculate sum of all dataset counts per bin
I_q = np.histogram(q_lzf, bins = q_q, weights = I_lzf)[0]
# normlaize dataaset by normalization counts and scale by monitor
R_q = np.where(fltr, I_q*self.norm_monitor/self.monitor / W_q, np.nan)
# error as squar-root of counts and sqrt from normalization (dR/R = sqrt( (dI/I)² + (dW/W)²)
dR_q = np.where(fltr, R_q*(np.sqrt(1./(I_q+0.1)+ 1./(W_q+0.1))), np.nan)
# q-resolution is the weighted sum of individual points q-resolutions
dq_q = np.histogram(q_lzf, bins = q_q, weights = weights_lzf * dq_lzf )[0]
dq_q = np.where(fltr, dq_q/W_q, np.nan)
return ProjectedReflectivity(R_q, dR_q, (q_q[1:]+q_q[:-1])/2., dq_q)
def plot(self, **kwargs):
@@ -298,7 +310,7 @@ class LZProjection(ProjectionInterface):
plt.colorbar(label='I / cpm')
plt.xlabel('$\\lambda$ / $\\AA$')
plt.ylabel('$\\Theta$ / °')
plt.xlim(3., 12.)
plt.xlim(self.lamda[0,0], self.lamda[-1,0])
af = self.alphaF[self.data.mask]
plt.ylim(af.min(), af.max())
plt.title('Wavelength vs. Reflection Angle')
@@ -544,12 +556,12 @@ class TofZProjection(ProjectionInterface):
('err', np.float64),
])
def __init__(self, tau, foldback=False):
def __init__(self, tau, foldback=False, combine=1):
self.z = np.arange(Detector.nBlades*Detector.nWires+1)-0.5
if foldback:
self.tof = np.arange(0, tau, 0.0005)
self.tof = np.arange(0, tau, 0.0005*combine)
else:
self.tof = np.arange(0, 2*tau, 0.0005)
self.tof = np.arange(0, 2*tau, 0.0005*combine)
self.data = np.zeros((self.tof.shape[0]-1, self.z.shape[0]-1), dtype=self._dtype).view(np.recarray)
self.monitor = 0.

View File

@@ -51,6 +51,7 @@ class E2HReduction:
self.plot_kwds = {}
plt.rcParams.update({'font.size': self.config.reduction.fontsize})
self.overwrite = eh.ApplyParameterOverwrites(self.config.experiment) # some actions use instrument parameters, change before that
if self.config.reduction.update:
# live update implies plotting
self.config.reduction.show_plot = True
@@ -58,6 +59,9 @@ class E2HReduction:
if self.config.reduction.plot==E2HPlotSelection.Raw:
# Raw implies fast caculations
self.config.reduction.fast = True
if not self.config.experiment.is_default('lambdaRange'):
# filtering wavelength requires frame analysis
self.config.reduction.fast = False
if not self.config.reduction.fast or self.config.reduction.plot in NEEDS_LAMDA:
from . import event_analysis as ea
@@ -65,6 +69,7 @@ class E2HReduction:
# Actions on datasets not used for normalization
self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset)
if not self.config.reduction.fast:
self.event_actions |= self.overwrite
self.event_actions |= eh.CorrectChopperPhase()
self.event_actions |= ea.ExtractWalltime()
else:
@@ -84,8 +89,8 @@ class E2HReduction:
# perform corrections for tof if not fast mode
self.event_actions |= eh.TofTimeCorrection(self.config.experiment.incidentAngle==IncidentAngle.alphaF)
# select needed actions in depenence of plots
if self.config.reduction.plot in NEEDS_LAMDA:
self.event_actions |= ea.MergeFrames()
if self.config.reduction.plot in NEEDS_LAMDA or not self.config.experiment.is_default('lambdaRange'):
self.event_actions |= ea.MergeFrames(lamdaCut=self.config.experiment.lambdaRange[0])
self.event_actions |= ea.AnalyzePixelIDs(self.config.experiment.yRange)
self.event_actions |= eh.TofTimeCorrection(self.config.experiment.incidentAngle==IncidentAngle.alphaF)
self.event_actions |= ea.CalculateWavelength(self.config.experiment.lambdaRange)
@@ -93,7 +98,8 @@ class E2HReduction:
# plot dependant options
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]:
self.grid = LZGrid(0.01, [0.0, 0.25])
self.grid = LZGrid(0.05, [0.0, 0.25], lambda_overwrite=self.config.experiment.lambdaRange)
self.grid.dldl = 0.01
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw,
E2HPlotSelection.LT, E2HPlotSelection.YT,
@@ -124,6 +130,12 @@ class E2HReduction:
if self.config.reduction.plotArgs==E2HPlotArguments.Default and not self.config.reduction.update:
# safe to image file if not auto-updating graph
plt.savefig(f'e2h_{self.config.reduction.plot}.png', dpi=300)
if self.config.reduction.kafka:
from .kafka_serializer import ESSSerializer
self.serializer = ESSSerializer()
self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread)
self.serializer.start_command_thread()
self.serializer.send(self.projection)
if self.config.reduction.update:
self.timer = self.fig.canvas.new_timer(1000)
self.timer.add_callback(self.update)
@@ -131,6 +143,7 @@ class E2HReduction:
if self.config.reduction.show_plot:
plt.show()
def register_colormap(self):
cmap = plt.colormaps['turbo'](np.arange(256))
cmap[:1, :] = np.array([256/256, 255/256, 236/256, 1])
@@ -140,6 +153,7 @@ class E2HReduction:
def prepare_graphs(self):
last_file_header = AmorHeader(self.file_list[-1])
self.overwrite.perform_action(last_file_header)
tthh = last_file_header.geometry.nu - last_file_header.geometry.mu
if not self.config.reduction.is_default('thetaRangeR'):
@@ -222,7 +236,7 @@ class E2HReduction:
self.event_actions(self.dataset)
self.dataset.update_header(self.header)
self.header.measurement_data_files.append(fileio.File(file=fileName.split('/')[-1],
self.header.measurement_data_files.append(fileio.File(file=os.path.basename(fileName),
timestamp=self.dataset.fileDate))
def add_data(self):
@@ -298,4 +312,7 @@ class E2HReduction:
self.projection.update_plot()
plt.suptitle(self.create_title())
plt.draw()
plt.draw()
if self.config.reduction.kafka:
self.serializer.send(self.projection)

128
eos/reduction_kafka.py Normal file
View File

@@ -0,0 +1,128 @@
"""
Events 2 histogram, quick reduction of single file to display during experiment.
Can be used as a live preview with automatic update when files are modified.
"""
import logging
import os
from time import sleep
from .kafka_events import KafkaEventData
from .header import Header
from .options import E2HConfig
from . import event_handling as eh, event_analysis as ea
from .projection import TofZProjection, YZProjection
from .kafka_serializer import ESSSerializer
class KafkaReduction:
config: E2HConfig
header: Header
event_actions: eh.EventDataAction
_last_mtime = 0.
proj_yz: YZProjection
proj_tofz = TofZProjection
def __init__(self, config: E2HConfig):
self.config = config
self.header = Header()
self.event_data = KafkaEventData()
self.event_data.start()
self.prepare_actions()
def prepare_actions(self):
"""
Does not do any actual reduction.
"""
# Actions on datasets not used for normalization
self.event_actions = eh.ApplyPhaseOffset(self.config.experiment.chopperPhaseOffset)
self.event_actions |= eh.CorrectChopperPhase()
self.event_actions |= ea.MergeFrames()
self.event_actions |= eh.ApplyMask()
def reduce(self):
self.create_projections()
self.read_data()
self.add_data()
self.serializer = ESSSerializer()
self.serializer.start_command_thread()
self.loop()
def create_projections(self):
self.proj_yz = YZProjection()
self.proj_tofz = TofZProjection(self.event_data.timing.tau, foldback=True, combine=2)
def read_data(self):
# make sure the first events have arrived before starting analysis
self.event_data.new_events.wait()
self.dataset = self.event_data.get_events()
self.event_actions(self.dataset)
def add_data(self):
self.monitor = self.dataset.monitor
self.proj_yz.project(self.dataset, monitor=self.monitor)
self.proj_tofz.project(self.dataset, monitor=self.monitor)
def loop(self):
self.wait_for = self.serializer.new_count_started
while True:
try:
self.update()
self.wait_for.wait(1.0)
except KeyboardInterrupt:
self.event_data.stop_event.set()
self.event_data.join()
self.serializer.end_command_thread()
return
def update(self):
if self.serializer.new_count_started.is_set():
logging.warning('Start new count, clearing event data')
self.wait_for = self.serializer.count_stopped
self.event_data.restart()
self.serializer.new_count_started.clear()
self.create_projections()
return
elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set():
return self.finish_count()
try:
update_data = self.event_data.get_events()
except EOFError:
return
logging.info(" updating with new data")
self.event_actions(update_data)
self.dataset=update_data
self.monitor = self.dataset.monitor
self.proj_yz.project(update_data, self.monitor)
self.proj_tofz.project(update_data, self.monitor)
self.serializer.send(self.proj_yz)
self.serializer.send(self.proj_tofz)
def finish_count(self):
logging.debug(" stop event set, hold event collection and send final results")
self.wait_for = self.serializer.new_count_started
self.event_data.stop_counting.set()
try:
update_data = self.event_data.get_events()
except EOFError:
pass
else:
self.event_actions(update_data)
self.dataset = update_data
self.monitor = self.dataset.monitor
self.proj_yz.project(update_data, self.monitor)
self.proj_tofz.project(update_data, self.monitor)
logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}')
self.serializer.send(self.proj_yz, final=True)
self.serializer.send(self.proj_tofz, final=True)

View File

@@ -85,6 +85,9 @@ class ReflectivityReduction:
else:
self.norm = LZNormalisation.unity(self.grid)
if self.config.reduction.normalizationSmoothing:
self.norm.smooth(self.config.reduction.normalizationSmoothing)
# load R(q_z) curve to be subtracted:
if self.config.reduction.subtract:
self.sq_q, self.sR_q, self.sdR_q, self.sFileName = self.loadRqz(self.config.reduction.subtract)
@@ -101,6 +104,16 @@ class ReflectivityReduction:
self.read_file_block(i, short_notation)
# output
if self.config.output.is_default('outputName'):
import datetime
_date = datetime.datetime.now().replace(microsecond=0).isoformat()
if self.header.sample.name:
_sampleName = self.header.sample.name.replace(' ', '_')
else:
_sampleName = 'unknown'
_mu = int(self.dataset.geometry.mu * 3)
self.config.output.outputName = f'{_sampleName}_{_mu:03}_{_date}'
logging.warning('output:')
if 'Rqz.ort' in self.config.output.outputFormats:
@@ -117,7 +130,7 @@ class ReflectivityReduction:
plt.show()
def read_file_block(self, i, short_notation):
logging.warning('reading input:')
logging.warning('input:')
file_list = self.path_resolver.resolve(short_notation)
self.header.measurement_data_files = []
@@ -146,7 +159,7 @@ class ReflectivityReduction:
self.dataset.append(di)
for fileName in file_list:
self.header.measurement_data_files.append(fileio.File( file=fileName.split('/')[-1],
self.header.measurement_data_files.append(fileio.File( file=os.path.basename(fileName),
timestamp=self.dataset.fileDate))
@@ -159,7 +172,7 @@ class ReflectivityReduction:
def analyze_unsliced(self, i):
self.monitor = self.dataset.data.pulses.monitor.sum()
logging.warning(f' monitor = {self.monitor:8.2f} {MONITOR_UNITS[self.config.experiment.monitorType]}')
logging.info(f' monitor = {self.monitor:8.2f} {MONITOR_UNITS[self.config.experiment.monitorType]}')
proj:LZProjection = self.project_on_lz()
try:
@@ -364,7 +377,7 @@ class ReflectivityReduction:
if reference.data.events.shape[0] > 1e6:
self.norm.safe(n_path, self.normevent_actions.action_hash())
self.header.measurement_additional_files = self.norm.file_list
self.norm.update_header(self.header)
self.header.reduction.corrections.append('normalisation with \'additional files\'')
def project_on_lz(self, dataset=None):
@@ -390,7 +403,8 @@ class ReflectivityReduction:
proj.apply_lamda_mask(self.config.experiment.lambdaRange)
proj.apply_norm_mask(self.norm)
proj.apply_norm_mask(self.norm, min_norm=self.config.reduction.normalizationFilter,
min_theta=self.config.reduction.normAngleFilter)
proj.project(dataset, self.monitor)

43
nicos_config.md Normal file
View File

@@ -0,0 +1,43 @@
EOS-Service
===========
EOS can be used as histogram service to send images to the Nicos instrument control software.
For that you need to run it on the amor instrument computer:
```bash
amor-nicos {-vv}
```
The instrument config in Nicos needs to configure a Kafka JustBinItImage instance
for each histogram that should be used:
```python
hist_yz = device('nicos_sinq.devices.just_bin_it.JustBinItImage',
description = 'Detector pixel histogram over all times',
hist_topic = 'AMOR_histograms_YZ',
data_topic = 'AMOR_detector',
command_topic = 'AMOR_histCommands',
brokers = ['linkafka01.psi.ch:9092'],
unit = 'evts',
hist_type = '2-D SANSLLB',
det_width = 446,
det_height = 64,
),
hist_tofz = device('nicos_sinq.devices.just_bin_it.JustBinItImage',
description = 'Detector time of flight vs. z-pixel histogram over all y-values',
hist_topic = 'AMOR_histograms_TofZ',
data_topic = 'AMOR_detector',
command_topic = 'AMOR_histCommands',
brokers = ['linkafka01.psi.ch:9092'],
unit = 'evts',
hist_type = '2-D SANSLLB',
det_width = 118,
det_height = 446,
),
```
These images have then to be set in the detector configuration as _images_ items:
```
images=['hist_yz', 'hist_tofz'],
```

View File

@@ -35,3 +35,4 @@ Homepage = "https://github.com/jochenstahn/amor"
console_scripts =
eos = eos.__main__:main
events2histogram = eos.e2h:main
amor-nicos = eos.nicos:main

16
update.md Normal file
View File

@@ -0,0 +1,16 @@
Make new release
================
- Update revision in `eos/__init__.py`
- Commit changes `git commit -a -m "your message here"`
- Tag version `git tag v3.x.y`
- Push changes `git push` and `git push --tags`
- This should trigger the **Release** action on GitHub that builds a new version and uploads it to PyPI.
Update on AMOR
==============
- Login via SSH using the **amor** user.
- Activate eos virtual environment `source /home/software/virtualenv/eosenv/bin/activate`
- Update eos packge `pip install --upgrade amor-eos`