Fix kafka event ToV value and send actual final counts after stop signal
This commit is contained in:
@@ -115,7 +115,7 @@ class KafkaEventData(Thread):
|
||||
self.events.resize(prev_size+new_events, refcheck=False)
|
||||
self.events.pixelID[prev_size:] = events.pixel_id
|
||||
self.events.mask[prev_size:] = 0
|
||||
self.events.tof[prev_size:] = events.time_of_flight
|
||||
self.events.tof[prev_size:] = events.time_of_flight/1.e9
|
||||
|
||||
nicos_mapping = {
|
||||
'mu': ('geometry', 'mu'),
|
||||
|
||||
@@ -133,8 +133,6 @@ class ESSSerializer:
|
||||
})
|
||||
self._active_histogram_yz = None
|
||||
self._active_histogram_tofz = None
|
||||
self._last_message_yz = None
|
||||
self._last_message_tofz = None
|
||||
self.new_count_started = Event()
|
||||
self.count_stopped = Event()
|
||||
|
||||
@@ -163,34 +161,9 @@ class ESSSerializer:
|
||||
self.producer.flush()
|
||||
if isinstance(command, Stop):
|
||||
if command.hist_id == self._active_histogram_yz:
|
||||
suffix = 'YZ'
|
||||
self._active_histogram_yz = None
|
||||
message = self._last_message_yz
|
||||
message.timestamp = ktime()
|
||||
message.info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'FINISHED',
|
||||
"num events": message.data.sum()
|
||||
})
|
||||
self._last_message_yz = None
|
||||
self.count_stopped.set()
|
||||
elif command.hist_id == self._active_histogram_tofz:
|
||||
suffix = 'TofZ'
|
||||
self._active_histogram_tofz = None
|
||||
message = self._last_message_tofz
|
||||
message.timestamp = ktime()
|
||||
message.info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'FINISHED',
|
||||
"num events": message.data.sum()
|
||||
})
|
||||
self._last_message_tofz = None
|
||||
else:
|
||||
return
|
||||
self.producer.produce(value=message.serialize(),
|
||||
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
|
||||
callback=self.acked)
|
||||
self.producer.flush()
|
||||
elif isinstance(command, ConfigureHistogram):
|
||||
for hist in command.histograms:
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_YZ':
|
||||
@@ -199,7 +172,6 @@ class ESSSerializer:
|
||||
self._start = command.start
|
||||
self.count_stopped.clear()
|
||||
self.new_count_started.set()
|
||||
self.set_empty_messages()
|
||||
if hist.topic == KAFKA_TOPICS['histogram']+'_TofZ':
|
||||
self._active_histogram_tofz = hist.id
|
||||
|
||||
@@ -234,7 +206,11 @@ class ESSSerializer:
|
||||
else:
|
||||
logging.debug("Message produced: %s" % (str(msg)))
|
||||
|
||||
def send(self, proj: Union[YZProjection, TofZProjection]):
|
||||
def send(self, proj: Union[YZProjection, TofZProjection], final=False):
|
||||
if final:
|
||||
state = 'FINISHED'
|
||||
else:
|
||||
state = 'COUNTING'
|
||||
if isinstance(proj, YZProjection):
|
||||
if self._active_histogram_yz is None:
|
||||
return
|
||||
@@ -262,12 +238,11 @@ class ESSSerializer:
|
||||
errors=np.sqrt(proj.data.cts),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'COUNTING',
|
||||
"state": state,
|
||||
"num events": proj.data.cts.sum()
|
||||
})
|
||||
)
|
||||
self._last_message_yz = message
|
||||
logging.info(f" Sending {proj.data.cts.sum()} events to Nicos")
|
||||
logging.info(f" {state}: Sending {proj.data.cts.sum()} events to Nicos")
|
||||
elif isinstance(proj, TofZProjection):
|
||||
if self._active_histogram_tofz is None:
|
||||
return
|
||||
@@ -295,11 +270,10 @@ class ESSSerializer:
|
||||
errors=np.sqrt(proj.data.cts),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'COUNTING',
|
||||
"state": state,
|
||||
"num events": proj.data.I.sum()
|
||||
})
|
||||
)
|
||||
self._last_message_tofz = message
|
||||
else:
|
||||
raise NotImplementedError(f"Histogram for {proj.__class__.__name__} not implemented")
|
||||
|
||||
@@ -307,59 +281,3 @@ class ESSSerializer:
|
||||
topic=KAFKA_TOPICS['histogram']+'_'+suffix,
|
||||
callback=self.acked)
|
||||
self.producer.flush()
|
||||
|
||||
def set_empty_messages(self):
|
||||
self._last_message_yz = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
current_shape=(64, 448),
|
||||
dim_metadata=(
|
||||
DimMetadata(
|
||||
length=64,
|
||||
unit="pixel",
|
||||
label="Y",
|
||||
bin_boundaries=np.arange(64),
|
||||
),
|
||||
DimMetadata(
|
||||
length=448,
|
||||
unit="pixel",
|
||||
label="Z",
|
||||
bin_boundaries=np.arange(448),
|
||||
)
|
||||
),
|
||||
last_metadata_timestamp=0,
|
||||
data=np.zeros((64, 448)),
|
||||
errors=np.zeros((64, 448)),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'COUNTING',
|
||||
"num events": 0
|
||||
})
|
||||
)
|
||||
self._last_message_tofz = message = HistogramMessage(
|
||||
source='amor-eos',
|
||||
timestamp=ktime(),
|
||||
current_shape=(56, 448),
|
||||
dim_metadata=(
|
||||
DimMetadata(
|
||||
length=56,
|
||||
unit="ms",
|
||||
label="ToF",
|
||||
bin_boundaries=np.arange(56),
|
||||
),
|
||||
DimMetadata(
|
||||
length=448,
|
||||
unit="pixel",
|
||||
label="Z",
|
||||
bin_boundaries=np.arange(448),
|
||||
),
|
||||
),
|
||||
last_metadata_timestamp=0,
|
||||
data=np.zeros((56, 448)),
|
||||
errors=np.zeros((56, 448)),
|
||||
info=json.dumps({
|
||||
"start": self._start,
|
||||
"state": 'COUNTING',
|
||||
"num events": 0
|
||||
})
|
||||
)
|
||||
|
||||
@@ -89,10 +89,9 @@ class KafkaReduction:
|
||||
self.serializer.new_count_started.clear()
|
||||
self.proj_yz.clear()
|
||||
self.proj_tofz.clear()
|
||||
return
|
||||
elif self.serializer.count_stopped.is_set() and not self.event_data.stop_counting.is_set():
|
||||
logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}')
|
||||
self.wait_for = self.serializer.new_count_started
|
||||
self.event_data.stop_counting.set()
|
||||
return self.finish_count()
|
||||
try:
|
||||
update_data = self.event_data.get_events()
|
||||
except EOFError:
|
||||
@@ -107,3 +106,24 @@ class KafkaReduction:
|
||||
|
||||
self.serializer.send(self.proj_yz)
|
||||
self.serializer.send(self.proj_tofz)
|
||||
|
||||
def finish_count(self):
|
||||
logging.debug(" stop event set, hold event collection and send final results")
|
||||
self.wait_for = self.serializer.new_count_started
|
||||
self.event_data.stop_counting.set()
|
||||
|
||||
try:
|
||||
update_data = self.event_data.get_events()
|
||||
except EOFError:
|
||||
pass
|
||||
else:
|
||||
self.event_actions(update_data)
|
||||
self.dataset = update_data
|
||||
self.monitor = self.dataset.monitor
|
||||
self.proj_yz.project(update_data, self.monitor)
|
||||
self.proj_tofz.project(update_data, self.monitor)
|
||||
|
||||
logging.warning(f' stop counting, total events {int(self.proj_tofz.data.cts.sum())}')
|
||||
|
||||
self.serializer.send(self.proj_yz, final=True)
|
||||
self.serializer.send(self.proj_tofz, final=True)
|
||||
|
||||
Reference in New Issue
Block a user