jfjoch_receiver: Add channel for indexed images

This commit is contained in:
2023-05-19 16:44:58 +02:00
parent eaccdf67b7
commit 0ef50e06b3
6 changed files with 31 additions and 5 deletions

View File

@@ -30,6 +30,6 @@
"tcp://10.10.1.242:5404",
"tcp://10.10.1.242:5405"],
"preview_zmq_addr": "tcp://0.0.0.0:5400",
"spot_zmq_addr": "tcp://10.10.1.242:5401",
"preview_indexed_zmq_addr": "tcp://0.0.0.0:5401",
"grpc_addr": "unix:/opt/jfjoch/.jfjoch-fpga-receiver"
}

View File

@@ -28,7 +28,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
ImagePusher &in_image_sender,
Logger &in_logger, int64_t in_forward_and_sum_nthreads,
int64_t in_send_buffer_count,
ZMQPreviewPublisher* in_preview_publisher) :
ZMQPreviewPublisher* in_preview_publisher,
ZMQPreviewPublisher* in_preview_publisher_indexed) :
experiment(settings.jungfraujoch_settings()),
acquisition_device(in_aq_device),
logger(in_logger),
@@ -36,6 +37,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ?
2 : in_forward_and_sum_nthreads),
preview_publisher(in_preview_publisher),
preview_publisher_indexed(in_preview_publisher_indexed),
ndatastreams(experiment.GetDataStreamsNum()),
data_acquisition_ready(ndatastreams),
frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0),
@@ -79,6 +81,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
if (experiment.GetDetectorMode() == DetectorMode::Conversion) {
if (preview_publisher != nullptr)
preview_publisher->Start(experiment, calib.value());
if (preview_publisher_indexed != nullptr)
preview_publisher_indexed->Start(experiment, calib.value());
if (!GPUImageAnalysis::GPUPresent())
logger.Info("GPU support missing");
@@ -443,6 +447,10 @@ void JFJochReceiver::FrameTransformationThread() {
for (int i = 0; i < recip.size(); i++)
message.spots[i].indexed = indexer_result[0].indexed_spots[i];
indexer_result[0].l.Save(message.indexing_lattice);
if (preview_publisher_indexed)
preview_publisher_indexed->Publish(experiment,
transformation.GetPreview16BitImage(),
message);
} else {
message.indexing_result = 1;
indexing_solution.AddElement(image_number, 0);
@@ -616,6 +624,9 @@ void JFJochReceiver::FinalizeMeasurement() {
if (preview_publisher != nullptr)
preview_publisher->Stop(experiment);
if (preview_publisher_indexed != nullptr)
preview_publisher_indexed->Stop(experiment);
for (int d = 0; d < ndatastreams; d++)
acquisition_device[d]->ActionAbort();

View File

@@ -53,6 +53,7 @@ class JFJochReceiver {
ThreadSafeFIFO<uint64_t> images_to_go;
ZMQPreviewPublisher *preview_publisher = nullptr;
ZMQPreviewPublisher* preview_publisher_indexed = nullptr;
ImagePusher &image_pusher;
bool push_images_to_writer;
@@ -120,7 +121,8 @@ public:
ImagePusher &image_pusher,
Logger &logger, int64_t forward_and_sum_nthreads,
int64_t send_buffer_count,
ZMQPreviewPublisher* preview_publisher);
ZMQPreviewPublisher* preview_publisher,
ZMQPreviewPublisher* preview_publisher_indexed);
~JFJochReceiver();
JFJochReceiver(const JFJochReceiver &other) = delete;
JFJochReceiver& operator=(const JFJochReceiver &other) = delete;

View File

@@ -22,7 +22,7 @@ grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JF
receiver.reset();
receiver = std::make_unique<JFJochReceiver>(*request, aq_devices, image_pusher,
logger, nthreads, send_buffer_count,
preview_publisher);
preview_publisher, preview_publisher_indexed);
try {
// Don't want to stop
receiver->SetDataProcessingSettings(data_processing_settings);
@@ -114,6 +114,11 @@ JFJochReceiverService& JFJochReceiverService::PreviewPublisher(ZMQPreviewPublish
return *this;
}
JFJochReceiverService& JFJochReceiverService::PreviewPublisherIndexed(ZMQPreviewPublisher *in_preview_writer) {
preview_publisher_indexed = in_preview_writer;
return *this;
}
grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverStatus *response) {
// FPGA status can be polled outside the state mutex

View File

@@ -16,6 +16,7 @@ class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver::
ImagePusher &image_pusher;
ZMQPreviewPublisher *preview_publisher = nullptr;
ZMQPreviewPublisher *preview_publisher_indexed = nullptr;
int64_t nthreads = 8;
int64_t send_buffer_count = 32;
@@ -31,6 +32,7 @@ public:
JFJochReceiverService(std::vector<AcquisitionDevice *> &open_capi_device,
Logger &logger, ImagePusher &pusher);
JFJochReceiverService& PreviewPublisher(ZMQPreviewPublisher *in_preview_writer);
JFJochReceiverService& PreviewPublisherIndexed(ZMQPreviewPublisher *in_preview_writer);
JFJochReceiverService& NumThreads(int64_t input);
JFJochReceiverService& SendBufferCount(int64_t input);

View File

@@ -144,13 +144,19 @@ int main(int argc, char **argv) {
JFJochReceiverService service(aq_devices_ptr, logger, pusher);
std::unique_ptr<ZMQPreviewPublisher> preview;
std::unique_ptr<ZMQPreviewPublisher> preview, preview_indexed;
if (input.contains("preview_zmq_addr")) {
preview = std::make_unique<ZMQPreviewPublisher>(context, input["preview_zmq_addr"]);
service.PreviewPublisher(preview.get());
logger.Info("Preview available on ZMQ addr " + input["preview_zmq_addr"].get<std::string>());
}
if (input.contains("preview_indexed_zmq_addr")) {
preview_indexed = std::make_unique<ZMQPreviewPublisher>(context, input["preview_zmq_addr"]);
service.PreviewPublisher(preview_indexed.get());
logger.Info("Preview available on ZMQ addr " + input["preview_indexed_zmq_addr"].get<std::string>());
}
if (input.contains("compression_threads")) {
service.NumThreads(input["compression_threads"].get<int64_t>());
logger.Info("Compression threads {}", input["compression_threads"].get<int64_t>());