diff --git a/etc/recv.json b/etc/recv.json index b0b9e6c5..6cd5282b 100644 --- a/etc/recv.json +++ b/etc/recv.json @@ -30,6 +30,6 @@ "tcp://10.10.1.242:5404", "tcp://10.10.1.242:5405"], "preview_zmq_addr": "tcp://0.0.0.0:5400", - "spot_zmq_addr": "tcp://10.10.1.242:5401", + "preview_indexed_zmq_addr": "tcp://0.0.0.0:5401", "grpc_addr": "unix:/opt/jfjoch/.jfjoch-fpga-receiver" } diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 156fdb4b..e6b29941 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -28,7 +28,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, int64_t in_send_buffer_count, - ZMQPreviewPublisher* in_preview_publisher) : + ZMQPreviewPublisher* in_preview_publisher, + ZMQPreviewPublisher* in_preview_publisher_indexed) : experiment(settings.jungfraujoch_settings()), acquisition_device(in_aq_device), logger(in_logger), @@ -36,6 +37,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ? 2 : in_forward_and_sum_nthreads), preview_publisher(in_preview_publisher), + preview_publisher_indexed(in_preview_publisher_indexed), ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), @@ -79,6 +81,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, if (experiment.GetDetectorMode() == DetectorMode::Conversion) { if (preview_publisher != nullptr) preview_publisher->Start(experiment, calib.value()); + if (preview_publisher_indexed != nullptr) + preview_publisher_indexed->Start(experiment, calib.value()); if (!GPUImageAnalysis::GPUPresent()) logger.Info("GPU support missing"); @@ -443,6 +447,10 @@ void JFJochReceiver::FrameTransformationThread() { for (int i = 0; i < recip.size(); i++) message.spots[i].indexed = indexer_result[0].indexed_spots[i]; indexer_result[0].l.Save(message.indexing_lattice); + if (preview_publisher_indexed) + preview_publisher_indexed->Publish(experiment, + transformation.GetPreview16BitImage(), + message); } else { message.indexing_result = 1; indexing_solution.AddElement(image_number, 0); @@ -616,6 +624,9 @@ void JFJochReceiver::FinalizeMeasurement() { if (preview_publisher != nullptr) preview_publisher->Stop(experiment); + if (preview_publisher_indexed != nullptr) + preview_publisher_indexed->Stop(experiment); + for (int d = 0; d < ndatastreams; d++) acquisition_device[d]->ActionAbort(); diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index 39a06eeb..5b71608e 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -53,6 +53,7 @@ class JFJochReceiver { ThreadSafeFIFO images_to_go; ZMQPreviewPublisher *preview_publisher = nullptr; + ZMQPreviewPublisher* preview_publisher_indexed = nullptr; ImagePusher &image_pusher; bool push_images_to_writer; @@ -120,7 +121,8 @@ public: ImagePusher &image_pusher, Logger &logger, int64_t forward_and_sum_nthreads, int64_t send_buffer_count, - ZMQPreviewPublisher* preview_publisher); + ZMQPreviewPublisher* preview_publisher, + ZMQPreviewPublisher* preview_publisher_indexed); ~JFJochReceiver(); JFJochReceiver(const JFJochReceiver &other) = delete; JFJochReceiver& operator=(const JFJochReceiver &other) = delete; diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index 991cbdda..b7263eb5 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -22,7 +22,7 @@ grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JF receiver.reset(); receiver = std::make_unique(*request, aq_devices, image_pusher, logger, nthreads, send_buffer_count, - preview_publisher); + preview_publisher, preview_publisher_indexed); try { // Don't want to stop receiver->SetDataProcessingSettings(data_processing_settings); @@ -114,6 +114,11 @@ JFJochReceiverService& JFJochReceiverService::PreviewPublisher(ZMQPreviewPublish return *this; } +JFJochReceiverService& JFJochReceiverService::PreviewPublisherIndexed(ZMQPreviewPublisher *in_preview_writer) { + preview_publisher_indexed = in_preview_writer; + return *this; +} + grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverStatus *response) { // FPGA status can be polled outside the state mutex diff --git a/receiver/JFJochReceiverService.h b/receiver/JFJochReceiverService.h index 4f8d815e..abef96ac 100644 --- a/receiver/JFJochReceiverService.h +++ b/receiver/JFJochReceiverService.h @@ -16,6 +16,7 @@ class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver:: ImagePusher &image_pusher; ZMQPreviewPublisher *preview_publisher = nullptr; + ZMQPreviewPublisher *preview_publisher_indexed = nullptr; int64_t nthreads = 8; int64_t send_buffer_count = 32; @@ -31,6 +32,7 @@ public: JFJochReceiverService(std::vector &open_capi_device, Logger &logger, ImagePusher &pusher); JFJochReceiverService& PreviewPublisher(ZMQPreviewPublisher *in_preview_writer); + JFJochReceiverService& PreviewPublisherIndexed(ZMQPreviewPublisher *in_preview_writer); JFJochReceiverService& NumThreads(int64_t input); JFJochReceiverService& SendBufferCount(int64_t input); diff --git a/receiver/jfjoch_receiver.cpp b/receiver/jfjoch_receiver.cpp index f8e7a0c9..d50c7a33 100644 --- a/receiver/jfjoch_receiver.cpp +++ b/receiver/jfjoch_receiver.cpp @@ -144,13 +144,19 @@ int main(int argc, char **argv) { JFJochReceiverService service(aq_devices_ptr, logger, pusher); - std::unique_ptr preview; + std::unique_ptr preview, preview_indexed; if (input.contains("preview_zmq_addr")) { preview = std::make_unique(context, input["preview_zmq_addr"]); service.PreviewPublisher(preview.get()); logger.Info("Preview available on ZMQ addr " + input["preview_zmq_addr"].get()); } + if (input.contains("preview_indexed_zmq_addr")) { + preview_indexed = std::make_unique(context, input["preview_zmq_addr"]); + service.PreviewPublisher(preview_indexed.get()); + logger.Info("Preview available on ZMQ addr " + input["preview_indexed_zmq_addr"].get()); + } + if (input.contains("compression_threads")) { service.NumThreads(input["compression_threads"].get()); logger.Info("Compression threads {}", input["compression_threads"].get());