switch bsread Sender mode to PUB and non-blocking; select specific channels to send; re-pack data before buffering; use the correct pulse ID
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import argparse
|
||||
|
||||
from bsread.sender import Sender
|
||||
from bsread.sender import Sender, PUB
|
||||
|
||||
from utils import FileHandler, Sorter
|
||||
from zmqsocks import ZMQSocketsAccumulator, make_address
|
||||
@@ -32,7 +32,7 @@ def accumulate(accumulator_addr, bsread_port):
|
||||
sorter = Sorter()
|
||||
|
||||
if bsread_port:
|
||||
sender = Sender(port=bsread_port)
|
||||
sender = Sender(port=bsread_port, block=False, mode=PUB)
|
||||
sender.open()
|
||||
|
||||
while True:
|
||||
@@ -60,11 +60,20 @@ def accumulate(accumulator_addr, bsread_port):
|
||||
if not bsread_port:
|
||||
continue
|
||||
|
||||
sorter.add(pulse_id, results)
|
||||
to_copy = (
|
||||
"pulse_id",
|
||||
"frame",
|
||||
"is_good_frame",
|
||||
"number_of_spots",
|
||||
"saturated_pixels"
|
||||
)
|
||||
|
||||
data = {f"{detector}:{k}": results[k] for k in to_copy}
|
||||
|
||||
sorter.add(pulse_id, data)
|
||||
ready = sorter.flush_ready()
|
||||
for i in ready:
|
||||
data = {f"{detector}:{k}": v for k, v in i.items()}
|
||||
sender.send(data=data, pulse_id=pulse_id) #TODO: is there a timestamp in the input?
|
||||
for pulse_id, data in ready:
|
||||
sender.send(pulse_id=pulse_id, data=data) #TODO: is there a timestamp in the input?
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user