Transfer to NICOS working for YZ and TofZ projections

This commit is contained in:
2025-10-14 14:48:49 +02:00
parent fc1bd66c3d
commit 0b7a56628f
2 changed files with 141 additions and 38 deletions

View File

@@ -27,7 +27,8 @@ hist = {
"info": "info_string",
}
"""
from typing import Tuple, Union
import logging
from typing import List, Tuple, Union
import numpy as np
import json
@@ -38,7 +39,17 @@ from confluent_kafka import Producer, Consumer, TopicPartition
from uuid import uuid4
from .projection import LZProjection, YZProjection
from .projection import TofZProjection, YZProjection
KAFKA_BROKER = 'linkafka01.psi.ch:9092'
KAFKA_TOPICS = {
'histogram': 'AMOR_histograms',
'response': 'AMOR_histResponse',
'command': 'AMOR_histCommands'
}
def ktime():
return int(time()*1_000)
@dataclass
class DimMetadata:
@@ -61,60 +72,148 @@ class HistogramMessage:
def serialize(self):
return histogram_hs00.serialise_hs00(asdict(self))
@dataclass
class CommandMessage:
msg_id: str
cmd=None
@classmethod
def get_message(cls, data):
"""
Uses the sub-class cmd attribute to select which message to retugn
"""
msg = dict([(ci.cmd, ci) for ci in cls.__subclasses__()])
return msg[data['cmd']](**data)
@dataclass
class Stop(CommandMessage):
hist_id: str
id: str
cmd:str = 'stop'
@dataclass
class HistogramConfig:
id: str
type: str
data_brokers: List[str]
topic: str
data_topics: List[str]
tof_range: Tuple[float, float]
det_range: Tuple[int, int]
num_bins: int
width: int
height: int
left_edges: list
source: str
@dataclass
class ConfigureHistogram(CommandMessage):
histograms: List[HistogramConfig]
start: int
cmd:str = 'config'
def __post_init__(self):
self.histograms = [HistogramConfig(**cfg) for cfg in self.histograms]
class ESSSerializer:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': 'linkafka01.psi.ch:9092',
'bootstrap.servers': KAFKA_BROKER,
'message.max.bytes': 4_000_000,
})
self.consumer = Consumer({
'bootstrap.servers': 'linkafka01.psi.ch:9092',
'bootstrap.servers': KAFKA_BROKER,
"group.id": uuid4(),
"default.topic.config": {"auto.offset.reset": "latest"},
})
self._active_histogram = None
self._last_message = None
#tp = [TopicPartition( "SANSLLB_histCommands",0)]
#self.consumer.assign(tp)
self.consumer.subscribe(["SANSLLB_histCommands"])
self.consumer.subscribe([KAFKA_TOPICS['command']])
def process_message(self, message):
if message.error():
print("Command Consumer Error: %s", message.error())
logging.error("Command Consumer Error: %s", message.error())
else:
command = json.loads(message.value().decode())
print(command)
try:
command = CommandMessage.get_message(command)
except Exception:
logging.error(f'Could not interpret message: \n{command}', exc_info=True)
return
logging.warning(command)
resp = json.dumps({
"msg_id": command.get("id") or command.get("msg_id"),
"msg_id": getattr(command, "id", None) or command.msg_id,
"response": "ACK",
"message": ""
})
self.producer.produce(
topic="SANSLLB_histResponse",
topic=KAFKA_TOPICS['response'],
value=resp
)
self.producer.flush()
if isinstance(command, Stop) and command.hist_id == self._active_histogram:
self._active_histogram = None
message = self._last_message
message.timestamp = ktime()
message.info=json.dumps({
"start": self._start,
"state": 'FINISHED',
"num events": message.data.sum()
})
self._last_message = None
self.producer.produce(value=message.serialize(),
topic=KAFKA_TOPICS['histogram'],
callback=self.acked)
self.producer.flush()
elif isinstance(command, ConfigureHistogram):
self._active_histogram = command.histograms[0].id
self._start = command.start
def receive(self, timeout=5):
rec = self.consumer.poll(5)
rec = self.consumer.poll(timeout)
if rec is not None:
self.process_message(rec)
return True
else:
return False
def receive_loop(self):
while self._keep_receiving:
try:
self.receive()
except Exception:
logging.error("Exception while receiving", exc_info=True)
def start_command_thread(self):
from threading import Thread
self._keep_receiving = True
self._command_thread = Thread(target=self.receive_loop)
self._command_thread.start()
def end_command_thread(self, event=None):
self._keep_receiving = False
self._command_thread.join()
def acked(self, err, msg):
# We need to have callback to produce-method to catch server errors
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
logging.warning("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
logging.debug("Message produced: %s" % (str(msg)))
def send(self, proj: Union[YZProjection, LZProjection]):
def send(self, proj: Union[YZProjection, TofZProjection]):
if self._active_histogram is None:
proj.clear()
return
if isinstance(proj, YZProjection):
message = HistogramMessage(
source='just-bin-it',
timestamp=int(time()),
source='amor-eos',
timestamp=ktime(),
current_shape=(proj.y.shape[0]-1, proj.z.shape[0]-1),
dim_metadata=(
DimMetadata(
@@ -131,38 +230,38 @@ class ESSSerializer:
)
),
last_metadata_timestamp=0,
data=proj.data.I,
errors=proj.data.err,
data=proj.data.cts,
errors=np.sqrt(proj.data.cts),
info=json.dumps({
"start": int(time()),
"start": self._start,
"state": 'COUNTING',
"num events": proj.data.cts.sum()
})
)
elif isinstance(proj, LZProjection):
elif isinstance(proj, TofZProjection):
message = HistogramMessage(
source='just-bin-it',
timestamp=int(time()),
current_shape=(proj.lamda.shape[0]-1, proj.alphaF.shape[0]-1),
source='amor-eos',
timestamp=ktime(),
current_shape=(proj.tof.shape[0]-1, proj.z.shape[0]-1),
dim_metadata=(
DimMetadata(
length=proj.lamda.shape[0]-1,
unit="Angstrom",
label="Lambda",
bin_boundaries=proj.lamda,
length=proj.tof.shape[0]-1,
unit="ms",
label="ToF",
bin_boundaries=proj.tof,
),
DimMetadata(
length=proj.alphaF.shape[0]-1,
unit="degrees",
label="Theta",
bin_boundaries=proj.alphaF,
)
length=proj.z.shape[0]-1,
unit="pixel",
label="Z",
bin_boundaries=proj.z,
),
),
last_metadata_timestamp=0,
data=proj.data.ref,
errors=proj.data.err,
data=proj.data.cts,
errors=np.sqrt(proj.data.cts),
info=json.dumps({
"start": int(time()),
"start": self._start,
"state": 'COUNTING',
"num events": proj.data.I.sum()
})
@@ -170,7 +269,8 @@ class ESSSerializer:
else:
raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented")
self._last_message = message
self.producer.produce(value=message.serialize(),
topic='SANSLLB_histograms',
topic=KAFKA_TOPICS['histogram'],
callback=self.acked)
self.producer.flush()

View File

@@ -93,7 +93,8 @@ class E2HReduction:
# plot dependant options
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.LT, E2HPlotSelection.Q]:
self.grid = LZGrid(0.01, [0.0, 0.25])
self.grid = LZGrid(0.05, [0.0, 0.25])
self.grid.dldl = 0.05
if self.config.reduction.plot in [E2HPlotSelection.All, E2HPlotSelection.Raw,
E2HPlotSelection.LT, E2HPlotSelection.YT,
@@ -127,6 +128,8 @@ class E2HReduction:
if self.config.reduction.kafka:
from .kafka_serializer import ESSSerializer
self.serializer = ESSSerializer()
self.fig.canvas.mpl_connect('close_event', self.serializer.end_command_thread)
self.serializer.start_command_thread()
self.serializer.send(self.projection)
if self.config.reduction.update:
self.timer = self.fig.canvas.new_timer(1000)