diff --git a/broker/CMakeLists.txt b/broker/CMakeLists.txt index 8f0ecfe4..bb8babe8 100644 --- a/broker/CMakeLists.txt +++ b/broker/CMakeLists.txt @@ -3,7 +3,7 @@ ADD_LIBRARY(JFJochBroker STATIC JFJochServices.cpp JFJochServices.h JFJochBroker.cpp JFJochBroker.h JFJochBrokerParser.cpp JFJochBrokerParser.h) -TARGET_LINK_LIBRARIES(JFJochBroker JFJochReceiver JFJochDetector gRPCClients CommonFunctions JFJochProtoBuf) +TARGET_LINK_LIBRARIES(JFJochBroker JFJochReceiver JFJochDetector CommonFunctions JFJochProtoBuf) ADD_EXECUTABLE(jfjoch_broker jfjoch_broker.cpp) TARGET_LINK_LIBRARIES(jfjoch_broker JFJochBroker) diff --git a/broker/JFJochBrokerParser.cpp b/broker/JFJochBrokerParser.cpp index 6637a1e9..d4886466 100644 --- a/broker/JFJochBrokerParser.cpp +++ b/broker/JFJochBrokerParser.cpp @@ -234,12 +234,6 @@ void ParseBrokerConfiguration(const nlohmann::json &input, const std::string& ta if (CHECK_OBJECT(input, tag)) { auto j = input[tag]; - if (CHECK_ARRAY(j, "writer")) { - for (const auto &iter: j["writer"]) { - broker.Services().Writer(GET_STR(iter, "addr_grpc"), GET_STR(iter, "addr_zmq")); - } - } - if (j.contains("detector")) broker.Services().Detector(); } else diff --git a/broker/JFJochServices.cpp b/broker/JFJochServices.cpp index e1d0099e..081f6a93 100644 --- a/broker/JFJochServices.cpp +++ b/broker/JFJochServices.cpp @@ -8,13 +8,6 @@ JFJochServices::JFJochServices(Logger &in_logger) : logger(in_logger) {} void JFJochServices::Start(const DiffractionExperiment& experiment, const JFCalibration &calibration) { logger.Info("Measurement start for: {}", experiment.GetFilePrefix()); - if ((experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty())) { - logger.Info(" ... writer start"); - writer.Start(writer_zmq_addr, 0); - writer_running = true; - } else - writer_running = false; - if (receiver != nullptr) { logger.Info(" ... receiver start"); if (experiment.GetDetectorMode() == DetectorMode::Conversion) @@ -72,20 +65,6 @@ JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) { logger.Info("Receiver finished with success"); } - if (writer_running) { - logger.Info("Stopping writer"); - try { - auto stats = writer.Stop(); - logger.Info(" ... finished with success"); - for (int i = 0; i < stats.size(); i++) - logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz", - i, stats[i].nimages(), stats[i].performance_mbs(), stats[i].performance_hz()); - } catch (JFJochException &e) { - logger.Error(" ... finished with error {}",e.what()); - exception = std::make_unique(e); - } - } - if (detector) { logger.Info("Stopping detector"); try { @@ -106,7 +85,7 @@ JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) { void JFJochServices::Abort() { // Abort should try to achieve the best outcome possible - // but it OK if things fail (for example lost connection) + // but it is OK if things fail (for example lost connection) try { if (receiver != nullptr) receiver->Abort(); @@ -127,13 +106,6 @@ JFJochServices &JFJochServices::Receiver(JFJochReceiverService *input) { return *this; } -JFJochServices &JFJochServices::Writer(const std::string &addr, const std::string &zmq_push_addr) { - writer.AddClient(addr); - writer_zmq_addr.push_back(zmq_push_addr); - logger.Info("Using writer service with gRPC {} listening for images from ZeroMQ {}", addr, zmq_push_addr); - return *this; -} - JFJochServices &JFJochServices::Detector() { detector = std::make_unique(); logger.Info("Using detector service"); @@ -177,7 +149,3 @@ void JFJochServices::Trigger() { if (detector) detector->Trigger(); } - -size_t JFJochServices::WriterZMQCount() const { - return writer_zmq_addr.size(); -} \ No newline at end of file diff --git a/broker/JFJochServices.h b/broker/JFJochServices.h index 5a26746d..8cf01cee 100644 --- a/broker/JFJochServices.h +++ b/broker/JFJochServices.h @@ -7,7 +7,6 @@ #include "../jungfrau/JFCalibration.h" #include "../common/Logger.h" #include "../receiver/JFJochReceiverService.h" -#include "../grpc/JFJochWriterGroupClient.h" #include "../detector_control/DetectorWrapper.h" struct JFJochServicesOutput { @@ -16,12 +15,9 @@ struct JFJochServicesOutput { class JFJochServices { JFJochReceiverService *receiver = nullptr; - JFJochWriterGroupClient writer; std::unique_ptr detector; Logger &logger; - bool writer_running = false; - std::vector writer_zmq_addr; public: explicit JFJochServices(Logger &in_logger); void On(const DiffractionExperiment& experiment); @@ -38,10 +34,7 @@ public: void SetDataProcessingSettings(const DataProcessingSettings &settings); JFJochServices& Receiver(JFJochReceiverService *input); - JFJochServices& Writer(const std::string &addr, const std::string &zmq_push_addr); JFJochServices& Detector(); - - size_t WriterZMQCount() const; }; diff --git a/broker/JFJochStateMachine.h b/broker/JFJochStateMachine.h index 88fa358d..c48bc93f 100644 --- a/broker/JFJochStateMachine.h +++ b/broker/JFJochStateMachine.h @@ -8,6 +8,8 @@ #include #include +#include + #include "../common/DiffractionExperiment.h" #include "../jungfrau/JFCalibration.h" #include "../common/Logger.h" diff --git a/grpc/CMakeLists.txt b/grpc/CMakeLists.txt index 1b6bfcee..8bd23477 100644 --- a/grpc/CMakeLists.txt +++ b/grpc/CMakeLists.txt @@ -45,9 +45,3 @@ ADD_DEPENDENCIES(JFJochProtoBuf jfjoch-grpc-python) TARGET_INCLUDE_DIRECTORIES(JFJochProtoBuf PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) TARGET_LINK_LIBRARIES(JFJochProtoBuf ${_GRPC_GRPCPP}) - -ADD_LIBRARY(gRPCClients STATIC - JFJochWriterClient.cpp JFJochWriterClient.h - JFJochWriterGroupClient.cpp JFJochWriterGroupClient.h) - -TARGET_LINK_LIBRARIES(gRPCClients CommonFunctions JFJochProtoBuf) \ No newline at end of file diff --git a/grpc/jfjoch.proto b/grpc/jfjoch.proto index 4e68c452..d8e93754 100644 --- a/grpc/jfjoch.proto +++ b/grpc/jfjoch.proto @@ -66,8 +66,6 @@ message Plot { repeated float y = 2; } -// DiffractionExperiment - message DatasetSettings { int64 images_per_trigger = 1; int64 ntrigger = 2; @@ -111,8 +109,6 @@ message DetectorSettings { int64 storage_cell_delay_ns = 10; } -// Calibration - message ModuleStatistics { int64 module_number = 1; int64 storage_cell_number = 2; @@ -132,7 +128,6 @@ message JFCalibrationStatistics { repeated ModuleStatistics module_statistics = 1; } -// Receiver enum PlotType { BKG_ESTIMATE = 0; RAD_INT = 1; @@ -156,25 +151,6 @@ message RadialIntegrationProfiles { repeated RadialIntegrationProfile profiles = 1; } -// Writer - -message WriterInput { - string zmq_receiver_address = 1; - int64 series_id = 2; -} - -message DataFileStatistics { - string name = 1; - int64 nimages = 2; -} - -message WriterOutput { - int64 nimages = 1; - float performance_MBs = 2; - float performance_Hz = 3; - repeated DataFileStatistics file_statistics = 4; -} - message DataProcessingSettings { float signal_to_noise_threshold = 1; // STRONG_PIXEL in XDS int64 photon_count_threshold = 2; // Threshold in photon counts @@ -188,8 +164,6 @@ message DataProcessingSettings { bool preview_indexed_only = 10; } -// Broker - message Image { bytes data = 1; int64 width = 2; @@ -244,12 +218,6 @@ message DetectorSelection { int64 id = 1; } -service gRPC_JFJochWriter { - rpc Start (WriterInput) returns (Empty) {} - rpc Abort (Empty) returns (Empty) {} - rpc Stop (Empty) returns (WriterOutput) {} -} - service gRPC_JFJochBroker { rpc Start (DatasetSettings) returns (Empty) {} rpc Stop (Empty) returns (Empty) {} diff --git a/python/jfjoch_pb2.py b/python/jfjoch_pb2.py index a86382fd..5df676f9 100644 --- a/python/jfjoch_pb2.py +++ b/python/jfjoch_pb2.py @@ -13,23 +13,23 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cjfjoch.proto\x12\x0eJFJochProtoBuf\"\x07\n\x05\x45mpty\"W\n\x08UnitCell\x12\t\n\x01\x61\x18\x01 \x01(\x02\x12\t\n\x01\x62\x18\x02 \x01(\x02\x12\t\n\x01\x63\x18\x03 \x01(\x02\x12\r\n\x05\x61lpha\x18\x04 \x01(\x02\x12\x0c\n\x04\x62\x65ta\x18\x05 \x01(\x02\x12\r\n\x05gamma\x18\x06 \x01(\x02\")\n\x06Vector\x12\t\n\x01x\x18\x01 \x01(\x02\x12\t\n\x01y\x18\x02 \x01(\x02\x12\t\n\x01z\x18\x03 \x01(\x02\"|\n\x10RotationSettings\x12\x17\n\x0fstart_angle_deg\x18\x01 \x01(\x02\x12 \n\x18\x61ngle_incr_per_image_deg\x18\x02 \x01(\x02\x12-\n\rrotation_axis\x18\x03 \x01(\x0b\x32\x16.JFJochProtoBuf.Vector\"\x1c\n\x04Plot\x12\t\n\x01x\x18\x01 \x03(\x02\x12\t\n\x01y\x18\x02 \x03(\x02\"\xb1\x04\n\x0f\x44\x61tasetSettings\x12\x1a\n\x12images_per_trigger\x18\x01 \x01(\x03\x12\x10\n\x08ntrigger\x18\x02 \x01(\x03\x12:\n\x11\x66pga_pixel_output\x18\x03 \x01(\x0e\x32\x1f.JFJochProtoBuf.FPGAPixelOutput\x12\x11\n\tsummation\x18\x04 \x01(\x03\x12\x12\n\nbeam_x_pxl\x18\x05 \x01(\x02\x12\x12\n\nbeam_y_pxl\x18\x06 \x01(\x02\x12\x1c\n\x14\x64\x65tector_distance_mm\x18\x07 \x01(\x02\x12\x19\n\x11photon_energy_keV\x18\x08 \x01(\x02\x12\x13\n\x0b\x66ile_prefix\x18\t \x01(\t\x12\x17\n\x0f\x64\x61ta_file_count\x18\n \x01(\x03\x12\x30\n\x0b\x63ompression\x18\x0b \x01(\x0e\x32\x1b.JFJochProtoBuf.Compression\x12\x13\n\x0bsample_name\x18\x0c \x01(\t\x12+\n\tunit_cell\x18\r \x01(\x0b\x32\x18.JFJochProtoBuf.UnitCell\x12\x1a\n\x12space_group_number\x18\x0e \x01(\x03\x12 \n\x18rad_int_solid_angle_corr\x18\x12 \x01(\x08\x12!\n\x19rad_int_polarization_corr\x18\x13 \x01(\x08\x12#\n\x1brad_int_polarization_factor\x18\x14 \x01(\x02\x12\x18\n\x10save_calibration\x18\x15 \x01(\x08\"\x90\x02\n\x10\x44\x65tectorSettings\x12\x15\n\rframe_time_us\x18\x01 \x01(\x03\x12\x15\n\rcount_time_us\x18\x02 \x01(\x03\x12\x1a\n\x12storage_cell_count\x18\x03 \x01(\x03\x12%\n\x1duse_internal_packet_generator\x18\x04 \x01(\x08\x12\x18\n\x10\x63ollect_raw_data\x18\x05 \x01(\x08\x12\x1a\n\x12pedestal_g0_frames\x18\x06 \x01(\x03\x12\x1a\n\x12pedestal_g1_frames\x18\x07 \x01(\x03\x12\x1a\n\x12pedestal_g2_frames\x18\x08 \x01(\x03\x12\x1d\n\x15storage_cell_delay_ns\x18\n \x01(\x03\"\xed\x01\n\x10ModuleStatistics\x12\x15\n\rmodule_number\x18\x01 \x01(\x03\x12\x1b\n\x13storage_cell_number\x18\x02 \x01(\x03\x12\x18\n\x10pedestal_g0_mean\x18\x03 \x01(\x02\x12\x18\n\x10pedestal_g1_mean\x18\x04 \x01(\x02\x12\x18\n\x10pedestal_g2_mean\x18\x05 \x01(\x02\x12\x14\n\x0cgain_g0_mean\x18\x06 \x01(\x02\x12\x14\n\x0cgain_g1_mean\x18\x07 \x01(\x02\x12\x14\n\x0cgain_g2_mean\x18\x08 \x01(\x02\x12\x15\n\rmasked_pixels\x18\t \x01(\x04\"V\n\x17JFCalibrationStatistics\x12;\n\x11module_statistics\x18\x01 \x03(\x0b\x32 .JFJochProtoBuf.ModuleStatistics\"F\n\x0bPlotRequest\x12&\n\x04type\x18\x01 \x01(\x0e\x32\x18.JFJochProtoBuf.PlotType\x12\x0f\n\x07\x62inning\x18\x02 \x01(\x04\"M\n\x18RadialIntegrationProfile\x12\r\n\x05title\x18\x01 \x01(\t\x12\"\n\x04plot\x18\x02 \x01(\x0b\x32\x14.JFJochProtoBuf.Plot\"W\n\x19RadialIntegrationProfiles\x12:\n\x08profiles\x18\x01 \x03(\x0b\x32(.JFJochProtoBuf.RadialIntegrationProfile\">\n\x0bWriterInput\x12\x1c\n\x14zmq_receiver_address\x18\x01 \x01(\t\x12\x11\n\tseries_id\x18\x02 \x01(\x03\"3\n\x12\x44\x61taFileStatistics\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07nimages\x18\x02 \x01(\x03\"\x8d\x01\n\x0cWriterOutput\x12\x0f\n\x07nimages\x18\x01 \x01(\x03\x12\x17\n\x0fperformance_MBs\x18\x02 \x01(\x02\x12\x16\n\x0eperformance_Hz\x18\x03 \x01(\x02\x12;\n\x0f\x66ile_statistics\x18\x04 \x03(\x0b\x32\".JFJochProtoBuf.DataFileStatistics\"\xbb\x02\n\x16\x44\x61taProcessingSettings\x12!\n\x19signal_to_noise_threshold\x18\x01 \x01(\x02\x12\x1e\n\x16photon_count_threshold\x18\x02 \x01(\x03\x12\x18\n\x10min_pix_per_spot\x18\x03 \x01(\x03\x12\x18\n\x10max_pix_per_spot\x18\x04 \x01(\x03\x12\x16\n\x0elocal_bkg_size\x18\x05 \x01(\x03\x12\x1d\n\x15high_resolution_limit\x18\x06 \x01(\x02\x12\x1c\n\x14low_resolution_limit\x18\x07 \x01(\x02\x12\x1a\n\x12\x62kg_estimate_low_q\x18\x08 \x01(\x02\x12\x1b\n\x13\x62kg_estimate_high_q\x18\t \x01(\x02\x12\x1c\n\x14preview_indexed_only\x18\n \x01(\x08\"I\n\x05Image\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\r\n\x05width\x18\x02 \x01(\x03\x12\x0e\n\x06height\x18\x03 \x01(\x03\x12\x13\n\x0bpixel_depth\x18\x04 \x01(\x03\".\n\nMaskToLoad\x12\x0c\n\x04mask\x18\x01 \x03(\r\x12\x12\n\nbit_to_set\x18\x02 \x01(\x05\"\xc9\x02\n\x15MeasurementStatistics\x12\x13\n\x0b\x66ile_prefix\x18\x01 \x01(\t\x12\x18\n\x10images_collected\x18\x02 \x01(\x03\x12\x1d\n\x15max_image_number_sent\x18\x03 \x01(\x03\x12\x1d\n\x15\x63ollection_efficiency\x18\x04 \x01(\x02\x12\x19\n\x11\x63ompression_ratio\x18\x05 \x01(\x02\x12\x11\n\tcancelled\x18\x06 \x01(\x08\x12\x19\n\x11max_receive_delay\x18\x07 \x01(\x03\x12\x15\n\rindexing_rate\x18\n \x01(\x02\x12\x16\n\x0e\x64\x65tector_width\x18\x0c \x01(\x03\x12\x17\n\x0f\x64\x65tector_height\x18\r \x01(\x03\x12\x1c\n\x14\x64\x65tector_pixel_depth\x18\x0e \x01(\x03\x12\x14\n\x0c\x62kg_estimate\x18\x10 \x01(\x02\"\x89\x01\n\x0c\x42rokerStatus\x12+\n\x0c\x62roker_state\x18\x01 \x01(\x0e\x32\x15.JFJochProtoBuf.State\x12\x10\n\x08progress\x18\x02 \x01(\x02\x12\x15\n\rindexing_rate\x18\x03 \x01(\x02\x12#\n\x1breceiver_send_buffers_avail\x18\x04 \x01(\x02\"H\n\x13\x44\x65tectorListElement\x12\x13\n\x0b\x64\x65scription\x18\x01 \x01(\t\x12\x10\n\x08nmodules\x18\x02 \x01(\x03\x12\n\n\x02id\x18\x03 \x01(\x03\"v\n\x0c\x44\x65tectorList\x12\x35\n\x08\x64\x65tector\x18\x01 \x03(\x0b\x32#.JFJochProtoBuf.DetectorListElement\x12\x12\n\ncurrent_id\x18\x02 \x01(\x03\x12\x1b\n\x13\x63urrent_description\x18\x03 \x01(\t\"\x1f\n\x11\x44\x65tectorSelection\x12\n\n\x02id\x18\x01 \x01(\x03*T\n\x0b\x43ompression\x12\r\n\tBSHUF_LZ4\x10\x00\x12\x0e\n\nBSHUF_ZSTD\x10\x01\x12\x12\n\x0e\x42SHUF_ZSTD_RLE\x10\x02\x12\x12\n\x0eNO_COMPRESSION\x10\x03*Z\n\x0c\x44\x65tectorMode\x12\x0e\n\nCONVERSION\x10\x00\x12\x07\n\x03RAW\x10\x01\x12\x0f\n\x0bPEDESTAL_G0\x10\x02\x12\x0f\n\x0bPEDESTAL_G1\x10\x03\x12\x0f\n\x0bPEDESTAL_G2\x10\x04*^\n\x05State\x12\x13\n\x0fNOT_INITIALIZED\x10\x00\x12\x08\n\x04IDLE\x10\x01\x12\x08\n\x04\x42USY\x10\x02\x12\x0c\n\x08PEDESTAL\x10\x03\x12\x13\n\x0f\x44\x41TA_COLLECTION\x10\x04\x12\t\n\x05\x45RROR\x10\x05*I\n\x0f\x46PGAPixelOutput\x12\x08\n\x04\x41UTO\x10\x00\x12\t\n\x05INT16\x10\x01\x12\n\n\x06UINT16\x10\x02\x12\t\n\x05INT32\x10\x03\x12\n\n\x06UINT32\x10\x04*{\n\x08PlotType\x12\x10\n\x0c\x42KG_ESTIMATE\x10\x00\x12\x0b\n\x07RAD_INT\x10\x01\x12\x0e\n\nSPOT_COUNT\x10\x02\x12\x11\n\rINDEXING_RATE\x10\x03\x12\x1a\n\x16INDEXING_RATE_PER_FILE\x10\x04\x12\x11\n\rADU_HISTOGRAM\x10\x05\x32\xca\x01\n\x11gRPC_JFJochWriter\x12=\n\x05Start\x12\x1b.JFJochProtoBuf.WriterInput\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x37\n\x05\x41\x62ort\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12=\n\x04Stop\x12\x15.JFJochProtoBuf.Empty\x1a\x1c.JFJochProtoBuf.WriterOutput\"\x00\x32\xd4\x0c\n\x11gRPC_JFJochBroker\x12\x41\n\x05Start\x12\x1f.JFJochProtoBuf.DatasetSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x36\n\x04Stop\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12:\n\x08Pedestal\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12<\n\nInitialize\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x38\n\x06\x43\x61ncel\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12<\n\nDeactivate\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x39\n\x07Trigger\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x42\n\tGetStatus\x12\x15.JFJochProtoBuf.Empty\x1a\x1c.JFJochProtoBuf.BrokerStatus\"\x00\x12\\\n\x18GetCalibrationStatistics\x12\x15.JFJochProtoBuf.Empty\x1a\'.JFJochProtoBuf.JFCalibrationStatistics\"\x00\x12P\n\x13GetDetectorSettings\x12\x15.JFJochProtoBuf.Empty\x1a .JFJochProtoBuf.DetectorSettings\"\x00\x12P\n\x13PutDetectorSettings\x12 .JFJochProtoBuf.DetectorSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12Z\n\x18GetMeasurementStatistics\x12\x15.JFJochProtoBuf.Empty\x1a%.JFJochProtoBuf.MeasurementStatistics\"\x00\x12\\\n\x19GetDataProcessingSettings\x12\x15.JFJochProtoBuf.Empty\x1a&.JFJochProtoBuf.DataProcessingSettings\"\x00\x12\\\n\x19PutDataProcessingSettings\x12&.JFJochProtoBuf.DataProcessingSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12?\n\x08GetPlots\x12\x1b.JFJochProtoBuf.PlotRequest\x1a\x14.JFJochProtoBuf.Plot\"\x00\x12\x62\n\x1cGetRadialIntegrationProfiles\x12\x15.JFJochProtoBuf.Empty\x1a).JFJochProtoBuf.RadialIntegrationProfiles\"\x00\x12?\n\rGetPedestalG0\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12?\n\rGetPedestalG1\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12?\n\rGetPedestalG2\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12\x39\n\x07GetMask\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12H\n\x0fGetDetectorList\x12\x15.JFJochProtoBuf.Empty\x1a\x1c.JFJochProtoBuf.DetectorList\"\x00\x12L\n\x0eSelectDetector\x12!.JFJochProtoBuf.DetectorSelection\x1a\x15.JFJochProtoBuf.Empty\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cjfjoch.proto\x12\x0eJFJochProtoBuf\"\x07\n\x05\x45mpty\"W\n\x08UnitCell\x12\t\n\x01\x61\x18\x01 \x01(\x02\x12\t\n\x01\x62\x18\x02 \x01(\x02\x12\t\n\x01\x63\x18\x03 \x01(\x02\x12\r\n\x05\x61lpha\x18\x04 \x01(\x02\x12\x0c\n\x04\x62\x65ta\x18\x05 \x01(\x02\x12\r\n\x05gamma\x18\x06 \x01(\x02\")\n\x06Vector\x12\t\n\x01x\x18\x01 \x01(\x02\x12\t\n\x01y\x18\x02 \x01(\x02\x12\t\n\x01z\x18\x03 \x01(\x02\"|\n\x10RotationSettings\x12\x17\n\x0fstart_angle_deg\x18\x01 \x01(\x02\x12 \n\x18\x61ngle_incr_per_image_deg\x18\x02 \x01(\x02\x12-\n\rrotation_axis\x18\x03 \x01(\x0b\x32\x16.JFJochProtoBuf.Vector\"\x1c\n\x04Plot\x12\t\n\x01x\x18\x01 \x03(\x02\x12\t\n\x01y\x18\x02 \x03(\x02\"\xb1\x04\n\x0f\x44\x61tasetSettings\x12\x1a\n\x12images_per_trigger\x18\x01 \x01(\x03\x12\x10\n\x08ntrigger\x18\x02 \x01(\x03\x12:\n\x11\x66pga_pixel_output\x18\x03 \x01(\x0e\x32\x1f.JFJochProtoBuf.FPGAPixelOutput\x12\x11\n\tsummation\x18\x04 \x01(\x03\x12\x12\n\nbeam_x_pxl\x18\x05 \x01(\x02\x12\x12\n\nbeam_y_pxl\x18\x06 \x01(\x02\x12\x1c\n\x14\x64\x65tector_distance_mm\x18\x07 \x01(\x02\x12\x19\n\x11photon_energy_keV\x18\x08 \x01(\x02\x12\x13\n\x0b\x66ile_prefix\x18\t \x01(\t\x12\x17\n\x0f\x64\x61ta_file_count\x18\n \x01(\x03\x12\x30\n\x0b\x63ompression\x18\x0b \x01(\x0e\x32\x1b.JFJochProtoBuf.Compression\x12\x13\n\x0bsample_name\x18\x0c \x01(\t\x12+\n\tunit_cell\x18\r \x01(\x0b\x32\x18.JFJochProtoBuf.UnitCell\x12\x1a\n\x12space_group_number\x18\x0e \x01(\x03\x12 \n\x18rad_int_solid_angle_corr\x18\x12 \x01(\x08\x12!\n\x19rad_int_polarization_corr\x18\x13 \x01(\x08\x12#\n\x1brad_int_polarization_factor\x18\x14 \x01(\x02\x12\x18\n\x10save_calibration\x18\x15 \x01(\x08\"\x90\x02\n\x10\x44\x65tectorSettings\x12\x15\n\rframe_time_us\x18\x01 \x01(\x03\x12\x15\n\rcount_time_us\x18\x02 \x01(\x03\x12\x1a\n\x12storage_cell_count\x18\x03 \x01(\x03\x12%\n\x1duse_internal_packet_generator\x18\x04 \x01(\x08\x12\x18\n\x10\x63ollect_raw_data\x18\x05 \x01(\x08\x12\x1a\n\x12pedestal_g0_frames\x18\x06 \x01(\x03\x12\x1a\n\x12pedestal_g1_frames\x18\x07 \x01(\x03\x12\x1a\n\x12pedestal_g2_frames\x18\x08 \x01(\x03\x12\x1d\n\x15storage_cell_delay_ns\x18\n \x01(\x03\"\xed\x01\n\x10ModuleStatistics\x12\x15\n\rmodule_number\x18\x01 \x01(\x03\x12\x1b\n\x13storage_cell_number\x18\x02 \x01(\x03\x12\x18\n\x10pedestal_g0_mean\x18\x03 \x01(\x02\x12\x18\n\x10pedestal_g1_mean\x18\x04 \x01(\x02\x12\x18\n\x10pedestal_g2_mean\x18\x05 \x01(\x02\x12\x14\n\x0cgain_g0_mean\x18\x06 \x01(\x02\x12\x14\n\x0cgain_g1_mean\x18\x07 \x01(\x02\x12\x14\n\x0cgain_g2_mean\x18\x08 \x01(\x02\x12\x15\n\rmasked_pixels\x18\t \x01(\x04\"V\n\x17JFCalibrationStatistics\x12;\n\x11module_statistics\x18\x01 \x03(\x0b\x32 .JFJochProtoBuf.ModuleStatistics\"F\n\x0bPlotRequest\x12&\n\x04type\x18\x01 \x01(\x0e\x32\x18.JFJochProtoBuf.PlotType\x12\x0f\n\x07\x62inning\x18\x02 \x01(\x04\"M\n\x18RadialIntegrationProfile\x12\r\n\x05title\x18\x01 \x01(\t\x12\"\n\x04plot\x18\x02 \x01(\x0b\x32\x14.JFJochProtoBuf.Plot\"W\n\x19RadialIntegrationProfiles\x12:\n\x08profiles\x18\x01 \x03(\x0b\x32(.JFJochProtoBuf.RadialIntegrationProfile\"\xbb\x02\n\x16\x44\x61taProcessingSettings\x12!\n\x19signal_to_noise_threshold\x18\x01 \x01(\x02\x12\x1e\n\x16photon_count_threshold\x18\x02 \x01(\x03\x12\x18\n\x10min_pix_per_spot\x18\x03 \x01(\x03\x12\x18\n\x10max_pix_per_spot\x18\x04 \x01(\x03\x12\x16\n\x0elocal_bkg_size\x18\x05 \x01(\x03\x12\x1d\n\x15high_resolution_limit\x18\x06 \x01(\x02\x12\x1c\n\x14low_resolution_limit\x18\x07 \x01(\x02\x12\x1a\n\x12\x62kg_estimate_low_q\x18\x08 \x01(\x02\x12\x1b\n\x13\x62kg_estimate_high_q\x18\t \x01(\x02\x12\x1c\n\x14preview_indexed_only\x18\n \x01(\x08\"I\n\x05Image\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\r\n\x05width\x18\x02 \x01(\x03\x12\x0e\n\x06height\x18\x03 \x01(\x03\x12\x13\n\x0bpixel_depth\x18\x04 \x01(\x03\".\n\nMaskToLoad\x12\x0c\n\x04mask\x18\x01 \x03(\r\x12\x12\n\nbit_to_set\x18\x02 \x01(\x05\"\xc9\x02\n\x15MeasurementStatistics\x12\x13\n\x0b\x66ile_prefix\x18\x01 \x01(\t\x12\x18\n\x10images_collected\x18\x02 \x01(\x03\x12\x1d\n\x15max_image_number_sent\x18\x03 \x01(\x03\x12\x1d\n\x15\x63ollection_efficiency\x18\x04 \x01(\x02\x12\x19\n\x11\x63ompression_ratio\x18\x05 \x01(\x02\x12\x11\n\tcancelled\x18\x06 \x01(\x08\x12\x19\n\x11max_receive_delay\x18\x07 \x01(\x03\x12\x15\n\rindexing_rate\x18\n \x01(\x02\x12\x16\n\x0e\x64\x65tector_width\x18\x0c \x01(\x03\x12\x17\n\x0f\x64\x65tector_height\x18\r \x01(\x03\x12\x1c\n\x14\x64\x65tector_pixel_depth\x18\x0e \x01(\x03\x12\x14\n\x0c\x62kg_estimate\x18\x10 \x01(\x02\"\x89\x01\n\x0c\x42rokerStatus\x12+\n\x0c\x62roker_state\x18\x01 \x01(\x0e\x32\x15.JFJochProtoBuf.State\x12\x10\n\x08progress\x18\x02 \x01(\x02\x12\x15\n\rindexing_rate\x18\x03 \x01(\x02\x12#\n\x1breceiver_send_buffers_avail\x18\x04 \x01(\x02\"H\n\x13\x44\x65tectorListElement\x12\x13\n\x0b\x64\x65scription\x18\x01 \x01(\t\x12\x10\n\x08nmodules\x18\x02 \x01(\x03\x12\n\n\x02id\x18\x03 \x01(\x03\"v\n\x0c\x44\x65tectorList\x12\x35\n\x08\x64\x65tector\x18\x01 \x03(\x0b\x32#.JFJochProtoBuf.DetectorListElement\x12\x12\n\ncurrent_id\x18\x02 \x01(\x03\x12\x1b\n\x13\x63urrent_description\x18\x03 \x01(\t\"\x1f\n\x11\x44\x65tectorSelection\x12\n\n\x02id\x18\x01 \x01(\x03*T\n\x0b\x43ompression\x12\r\n\tBSHUF_LZ4\x10\x00\x12\x0e\n\nBSHUF_ZSTD\x10\x01\x12\x12\n\x0e\x42SHUF_ZSTD_RLE\x10\x02\x12\x12\n\x0eNO_COMPRESSION\x10\x03*Z\n\x0c\x44\x65tectorMode\x12\x0e\n\nCONVERSION\x10\x00\x12\x07\n\x03RAW\x10\x01\x12\x0f\n\x0bPEDESTAL_G0\x10\x02\x12\x0f\n\x0bPEDESTAL_G1\x10\x03\x12\x0f\n\x0bPEDESTAL_G2\x10\x04*^\n\x05State\x12\x13\n\x0fNOT_INITIALIZED\x10\x00\x12\x08\n\x04IDLE\x10\x01\x12\x08\n\x04\x42USY\x10\x02\x12\x0c\n\x08PEDESTAL\x10\x03\x12\x13\n\x0f\x44\x41TA_COLLECTION\x10\x04\x12\t\n\x05\x45RROR\x10\x05*I\n\x0f\x46PGAPixelOutput\x12\x08\n\x04\x41UTO\x10\x00\x12\t\n\x05INT16\x10\x01\x12\n\n\x06UINT16\x10\x02\x12\t\n\x05INT32\x10\x03\x12\n\n\x06UINT32\x10\x04*{\n\x08PlotType\x12\x10\n\x0c\x42KG_ESTIMATE\x10\x00\x12\x0b\n\x07RAD_INT\x10\x01\x12\x0e\n\nSPOT_COUNT\x10\x02\x12\x11\n\rINDEXING_RATE\x10\x03\x12\x1a\n\x16INDEXING_RATE_PER_FILE\x10\x04\x12\x11\n\rADU_HISTOGRAM\x10\x05\x32\xd4\x0c\n\x11gRPC_JFJochBroker\x12\x41\n\x05Start\x12\x1f.JFJochProtoBuf.DatasetSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x36\n\x04Stop\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12:\n\x08Pedestal\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12<\n\nInitialize\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x38\n\x06\x43\x61ncel\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12<\n\nDeactivate\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x39\n\x07Trigger\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12\x42\n\tGetStatus\x12\x15.JFJochProtoBuf.Empty\x1a\x1c.JFJochProtoBuf.BrokerStatus\"\x00\x12\\\n\x18GetCalibrationStatistics\x12\x15.JFJochProtoBuf.Empty\x1a\'.JFJochProtoBuf.JFCalibrationStatistics\"\x00\x12P\n\x13GetDetectorSettings\x12\x15.JFJochProtoBuf.Empty\x1a .JFJochProtoBuf.DetectorSettings\"\x00\x12P\n\x13PutDetectorSettings\x12 .JFJochProtoBuf.DetectorSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12Z\n\x18GetMeasurementStatistics\x12\x15.JFJochProtoBuf.Empty\x1a%.JFJochProtoBuf.MeasurementStatistics\"\x00\x12\\\n\x19GetDataProcessingSettings\x12\x15.JFJochProtoBuf.Empty\x1a&.JFJochProtoBuf.DataProcessingSettings\"\x00\x12\\\n\x19PutDataProcessingSettings\x12&.JFJochProtoBuf.DataProcessingSettings\x1a\x15.JFJochProtoBuf.Empty\"\x00\x12?\n\x08GetPlots\x12\x1b.JFJochProtoBuf.PlotRequest\x1a\x14.JFJochProtoBuf.Plot\"\x00\x12\x62\n\x1cGetRadialIntegrationProfiles\x12\x15.JFJochProtoBuf.Empty\x1a).JFJochProtoBuf.RadialIntegrationProfiles\"\x00\x12?\n\rGetPedestalG0\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12?\n\rGetPedestalG1\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12?\n\rGetPedestalG2\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12\x39\n\x07GetMask\x12\x15.JFJochProtoBuf.Empty\x1a\x15.JFJochProtoBuf.Image\"\x00\x12H\n\x0fGetDetectorList\x12\x15.JFJochProtoBuf.Empty\x1a\x1c.JFJochProtoBuf.DetectorList\"\x00\x12L\n\x0eSelectDetector\x12!.JFJochProtoBuf.DetectorSelection\x1a\x15.JFJochProtoBuf.Empty\"\x00\x62\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'jfjoch_pb2', globals()) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _COMPRESSION._serialized_start=3137 - _COMPRESSION._serialized_end=3221 - _DETECTORMODE._serialized_start=3223 - _DETECTORMODE._serialized_end=3313 - _STATE._serialized_start=3315 - _STATE._serialized_end=3409 - _FPGAPIXELOUTPUT._serialized_start=3411 - _FPGAPIXELOUTPUT._serialized_end=3484 - _PLOTTYPE._serialized_start=3486 - _PLOTTYPE._serialized_end=3609 + _COMPRESSION._serialized_start=2876 + _COMPRESSION._serialized_end=2960 + _DETECTORMODE._serialized_start=2962 + _DETECTORMODE._serialized_end=3052 + _STATE._serialized_start=3054 + _STATE._serialized_end=3148 + _FPGAPIXELOUTPUT._serialized_start=3150 + _FPGAPIXELOUTPUT._serialized_end=3223 + _PLOTTYPE._serialized_start=3225 + _PLOTTYPE._serialized_end=3348 _EMPTY._serialized_start=32 _EMPTY._serialized_end=39 _UNITCELL._serialized_start=41 @@ -54,30 +54,22 @@ if _descriptor._USE_C_DESCRIPTORS == False: _RADIALINTEGRATIONPROFILE._serialized_end=1645 _RADIALINTEGRATIONPROFILES._serialized_start=1647 _RADIALINTEGRATIONPROFILES._serialized_end=1734 - _WRITERINPUT._serialized_start=1736 - _WRITERINPUT._serialized_end=1798 - _DATAFILESTATISTICS._serialized_start=1800 - _DATAFILESTATISTICS._serialized_end=1851 - _WRITEROUTPUT._serialized_start=1854 - _WRITEROUTPUT._serialized_end=1995 - _DATAPROCESSINGSETTINGS._serialized_start=1998 - _DATAPROCESSINGSETTINGS._serialized_end=2313 - _IMAGE._serialized_start=2315 - _IMAGE._serialized_end=2388 - _MASKTOLOAD._serialized_start=2390 - _MASKTOLOAD._serialized_end=2436 - _MEASUREMENTSTATISTICS._serialized_start=2439 - _MEASUREMENTSTATISTICS._serialized_end=2768 - _BROKERSTATUS._serialized_start=2771 - _BROKERSTATUS._serialized_end=2908 - _DETECTORLISTELEMENT._serialized_start=2910 - _DETECTORLISTELEMENT._serialized_end=2982 - _DETECTORLIST._serialized_start=2984 - _DETECTORLIST._serialized_end=3102 - _DETECTORSELECTION._serialized_start=3104 - _DETECTORSELECTION._serialized_end=3135 - _GRPC_JFJOCHWRITER._serialized_start=3612 - _GRPC_JFJOCHWRITER._serialized_end=3814 - _GRPC_JFJOCHBROKER._serialized_start=3817 - _GRPC_JFJOCHBROKER._serialized_end=5437 + _DATAPROCESSINGSETTINGS._serialized_start=1737 + _DATAPROCESSINGSETTINGS._serialized_end=2052 + _IMAGE._serialized_start=2054 + _IMAGE._serialized_end=2127 + _MASKTOLOAD._serialized_start=2129 + _MASKTOLOAD._serialized_end=2175 + _MEASUREMENTSTATISTICS._serialized_start=2178 + _MEASUREMENTSTATISTICS._serialized_end=2507 + _BROKERSTATUS._serialized_start=2510 + _BROKERSTATUS._serialized_end=2647 + _DETECTORLISTELEMENT._serialized_start=2649 + _DETECTORLISTELEMENT._serialized_end=2721 + _DETECTORLIST._serialized_start=2723 + _DETECTORLIST._serialized_end=2841 + _DETECTORSELECTION._serialized_start=2843 + _DETECTORSELECTION._serialized_end=2874 + _GRPC_JFJOCHBROKER._serialized_start=3351 + _GRPC_JFJOCHBROKER._serialized_end=4971 # @@protoc_insertion_point(module_scope) diff --git a/python/jfjoch_pb2_grpc.py b/python/jfjoch_pb2_grpc.py index b3e11cda..d7a6328c 100644 --- a/python/jfjoch_pb2_grpc.py +++ b/python/jfjoch_pb2_grpc.py @@ -5,133 +5,6 @@ import grpc import jfjoch_pb2 as jfjoch__pb2 -class gRPC_JFJochWriterStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Start = channel.unary_unary( - '/JFJochProtoBuf.gRPC_JFJochWriter/Start', - request_serializer=jfjoch__pb2.WriterInput.SerializeToString, - response_deserializer=jfjoch__pb2.Empty.FromString, - ) - self.Abort = channel.unary_unary( - '/JFJochProtoBuf.gRPC_JFJochWriter/Abort', - request_serializer=jfjoch__pb2.Empty.SerializeToString, - response_deserializer=jfjoch__pb2.Empty.FromString, - ) - self.Stop = channel.unary_unary( - '/JFJochProtoBuf.gRPC_JFJochWriter/Stop', - request_serializer=jfjoch__pb2.Empty.SerializeToString, - response_deserializer=jfjoch__pb2.WriterOutput.FromString, - ) - - -class gRPC_JFJochWriterServicer(object): - """Missing associated documentation comment in .proto file.""" - - def Start(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def Abort(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def Stop(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_gRPC_JFJochWriterServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Start': grpc.unary_unary_rpc_method_handler( - servicer.Start, - request_deserializer=jfjoch__pb2.WriterInput.FromString, - response_serializer=jfjoch__pb2.Empty.SerializeToString, - ), - 'Abort': grpc.unary_unary_rpc_method_handler( - servicer.Abort, - request_deserializer=jfjoch__pb2.Empty.FromString, - response_serializer=jfjoch__pb2.Empty.SerializeToString, - ), - 'Stop': grpc.unary_unary_rpc_method_handler( - servicer.Stop, - request_deserializer=jfjoch__pb2.Empty.FromString, - response_serializer=jfjoch__pb2.WriterOutput.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'JFJochProtoBuf.gRPC_JFJochWriter', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class gRPC_JFJochWriter(object): - """Missing associated documentation comment in .proto file.""" - - @staticmethod - def Start(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Start', - jfjoch__pb2.WriterInput.SerializeToString, - jfjoch__pb2.Empty.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def Abort(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Abort', - jfjoch__pb2.Empty.SerializeToString, - jfjoch__pb2.Empty.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def Stop(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Stop', - jfjoch__pb2.Empty.SerializeToString, - jfjoch__pb2.WriterOutput.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - class gRPC_JFJochBrokerStub(object): """Missing associated documentation comment in .proto file.""" diff --git a/tests/JFJochFullIntegrationTest.cpp b/tests/JFJochFullIntegrationTest.cpp index 0c8b3f13..e6d26561 100644 --- a/tests/JFJochFullIntegrationTest.cpp +++ b/tests/JFJochFullIntegrationTest.cpp @@ -1,11 +1,11 @@ // Copyright (2019-2023) Paul Scherrer Institute #include +#include #include "../grpc/gRPCServer_Template.h" #include "../broker/JFJochStateMachine.h" -#include "../writer/JFJochWriterService.h" -#include "../receiver/JFJochReceiverService.h" +#include "../writer/StreamWriter.h" #include "FPGAUnitTest.h" #include "../acquisition_device/MockAcquisitionDevice.h" #include "../common/ZMQImagePusher.h" @@ -40,15 +40,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver); + services.Receiver(&fpga_receiver); logger.Verbose(true); std::vector image(RAW_MODULE_SIZE); - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -95,10 +94,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") { REQUIRE(statistics.detector_width() == 2068); REQUIRE(statistics.detector_height() == 2164); REQUIRE(statistics.detector_pixel_depth() == 2); - writer_server->Shutdown(); + writer_future.get(); } - TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") { Logger logger("JFJochIntegrationTest_ZMQ_save_calibration"); ZMQContext zmq_context; @@ -127,15 +125,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver); + services.Receiver(&fpga_receiver); logger.Verbose(true); std::vector image(RAW_MODULE_SIZE);; - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -183,7 +180,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") { REQUIRE(statistics.detector_width() == 2068); REQUIRE(statistics.detector_height() == 2164); REQUIRE(statistics.detector_pixel_depth() == 2); - writer_server->Shutdown(); + REQUIRE_NOTHROW(writer_future.get()); } TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") { @@ -215,15 +212,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver); + services.Receiver(&fpga_receiver); logger.Verbose(true); std::vector image(RAW_MODULE_SIZE); - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -271,7 +267,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") REQUIRE(statistics.detector_height() == 2164); REQUIRE(statistics.detector_pixel_depth() == 2); - writer_server->Shutdown(); + REQUIRE_NOTHROW(writer_future.get()); } TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { @@ -292,7 +288,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().Mode(DetectorMode::Conversion); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -309,9 +304,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); JFJochProtoBuf::DetectorSettings detector_settings; detector_settings.set_frame_time_us(500); @@ -362,8 +356,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { REQUIRE(statistics.detector_height() == 8 * 512); REQUIRE(statistics.detector_pixel_depth() == 2); - - writer_server->Shutdown(); + REQUIRE_NOTHROW(writer_future.get()); } @@ -383,10 +376,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services - .Writer("unix:writer_test_0", "inproc://#0") - .Writer("unix:writer_test_1", "inproc://#1") - .Writer("unix:writer_test_2", "inproc://#2"); logger.Verbose(true); @@ -403,13 +392,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer_0(zmq_context, logger); - JFJochWriterService writer_1(zmq_context, logger); - JFJochWriterService writer_2(zmq_context, logger); + StreamWriter writer_0(zmq_context, logger, "inproc://#0"); + auto writer_0_future = writer_0.RunFuture(); - auto writer_server_0 = gRPCServer("unix:writer_test_0", writer_0); - auto writer_server_1 = gRPCServer("unix:writer_test_1", writer_1); - auto writer_server_2 = gRPCServer("unix:writer_test_2", writer_2); + StreamWriter writer_1(zmq_context, logger, "inproc://#1"); + auto writer_1_future = writer_1.RunFuture(); + + StreamWriter writer_2(zmq_context, logger, "inproc://#2"); + auto writer_2_future = writer_2.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -451,13 +441,11 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") { REQUIRE(statistics.collection_efficiency() == 1.0); REQUIRE(statistics.images_collected() == nimages); - - writer_server_0->Shutdown(); - writer_server_1->Shutdown(); - writer_server_2->Shutdown(); + REQUIRE_NOTHROW(writer_0_future.get()); + REQUIRE_NOTHROW(writer_1_future.get()); + REQUIRE_NOTHROW(writer_2_future.get()); } - TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") { Logger logger("JFJochIntegrationTest_Cancel"); @@ -475,7 +463,6 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -497,9 +484,8 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -529,8 +515,7 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") { REQUIRE(statistics.max_image_number_sent() == 4); REQUIRE(statistics.cancelled()); - - writer_server->Shutdown(); + REQUIRE_NOTHROW(writer_future.get()); } TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { @@ -549,7 +534,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod( 5ms); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -573,7 +557,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); ZMQPreviewPublisher preview(zmq_context, "inproc://#2"); fpga_receiver.PreviewPublisher(&preview).NumThreads(1); @@ -582,8 +567,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2")); rcv_preview_socket.SubscribeAll(); - auto writer_server = gRPCServer("unix:writer_test", writer); - REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -633,8 +616,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { REQUIRE(statistics.collection_efficiency() == 1.0); REQUIRE(statistics.images_collected() == 5); - - writer_server->Shutdown(); + REQUIRE_NOTHROW(writer_future.get()); } TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]") { @@ -653,7 +635,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]" state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod( 5ms); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -676,8 +657,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]" JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); - ZMQPreviewPublisher preview(zmq_context, "inproc://#2"); fpga_receiver.PreviewPublisher(&preview).NumThreads(1); @@ -685,8 +664,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]" REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2")); rcv_preview_socket.SubscribeAll(); - auto writer_server = gRPCServer("unix:writer_test", writer); - REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -734,8 +711,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]" REQUIRE(statistics.collection_efficiency() == 1.0); REQUIRE(statistics.images_collected() == 5); - - writer_server->Shutdown(); } TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") { @@ -753,7 +728,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") { state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36)); state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -796,9 +770,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); - - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -842,8 +815,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") { REQUIRE(statistics.collection_efficiency() == 1.0); REQUIRE(statistics.images_collected() == 1); - - writer_server->Shutdown(); + + REQUIRE_NOTHROW(writer_future.get()); } /* TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]") { @@ -979,7 +952,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") { state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); state_machine.NotThreadSafe_Experiment().LowQForRadialInt_recipA(0.5).HighQForRadialInt_recipA(3.5) .QSpacingForRadialInt_recipA(1.0); - services.Writer("unix:writer_test", "inproc://#1"); logger.Verbose(true); @@ -1027,8 +999,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") { JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); services.Receiver(&fpga_receiver); - JFJochWriterService writer(zmq_context, logger); - auto writer_server = gRPCServer("unix:writer_test", writer); + StreamWriter writer(zmq_context, logger, "inproc://#1"); + auto writer_future = writer.RunFuture(); REQUIRE_NOTHROW(state_machine.Initialize()); logger.Info("Initialized"); @@ -1070,8 +1042,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") { CHECK(plot_map[0].title() == "dataset"); CHECK(plot_map[0].plot().x_size() == 3); - - writer_server->Shutdown(); + + REQUIRE_NOTHROW(writer_future.get()); } /* diff --git a/tests/StreamWriterTest.cpp b/tests/StreamWriterTest.cpp index ee77ceff..55583c52 100644 --- a/tests/StreamWriterTest.cpp +++ b/tests/StreamWriterTest.cpp @@ -3,12 +3,9 @@ #include #include -#include "../writer/JFJochWriterService.h" #include "../writer/StreamWriter.h" -#include "../writer/HDF5Objects.h" #include "../common/ZMQImagePusher.h" #include "../receiver/JFJochReceiverService.h" -#include "../acquisition_device/HLSSimulatedDevice.h" TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { RegisterHDF5Filter(); @@ -17,9 +14,6 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { ZMQContext context; std::string zmq_addr = "inproc://#1"; - grpc::ServerContext grpc_context; - JFJochProtoBuf::Empty empty; - DiffractionExperiment x(DetectorGeometry(2)); x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5) .UseInternalPacketGenerator(true).Mode(DetectorMode::Raw).PedestalG0Frames(0); @@ -62,47 +56,3 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { REQUIRE(std::filesystem::remove("subdir/JFJochWriterTest_data_000.h5")); REQUIRE(std::filesystem::remove("subdir")); } - - -TEST_CASE("JFJochWriterServiceTest_ZMQ","[JFJochWriter]") { - RegisterHDF5Filter(); - - Logger logger("test"); - ZMQContext context; - std::string zmq_addr = "inproc://#1"; - - JFJochWriterService writer(context, logger); - - grpc::ServerContext grpc_context; - JFJochProtoBuf::Empty empty; - - DiffractionExperiment x(DetectorGeometry(2)); - x.FilePrefix("JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5) - .UseInternalPacketGenerator(true) - .Mode(DetectorMode::Raw).PedestalG0Frames(0); - - JFModuleGainCalibration empty_gain; - - AcquisitionDeviceGroup aq_devices; - for (int i = 0; i < x.GetDataStreamsNum(); i++) - aq_devices.AddHLSDevice(64); - - ZMQImagePusher pusher (context, {zmq_addr}); - JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher); - - JFJochProtoBuf::WriterInput writer_input; - writer_input.set_zmq_receiver_address(zmq_addr); - - JFJochReceiverOutput receiver_output; - JFJochProtoBuf::WriterOutput writer_output; - - REQUIRE(x.GetImageNum() == 5); - REQUIRE(writer.Start(&grpc_context, &writer_input, &empty).ok()); - - REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr)); - REQUIRE_NOTHROW(receiver_output = fpga_receiver_service.Stop()); - REQUIRE(writer.Stop(&grpc_context, &empty, &writer_output).ok()); - - REQUIRE(writer_output.nimages() == 5); - //TODO: Check contest of HDF5 file -} diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 3c6c95db..0d5064f0 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -17,5 +17,8 @@ TARGET_LINK_LIBRARIES(PreviewTest JFJochWriter CommonFunctions) ADD_EXECUTABLE(JFCalibrationPerfTest JFCalibrationPerfTest.cpp) TARGET_LINK_LIBRARIES(JFCalibrationPerfTest CommonFunctions) +ADD_EXECUTABLE(jfjoch_writer_test jfjoch_writer_test.cpp) +TARGET_LINK_LIBRARIES(jfjoch_writer_test JFJochWriter CommonFunctions) + INSTALL(TARGETS jfjoch_udp_simulator CompressionBenchmark HDF5DatasetWriteTest DataAnalysisPerfTest - PreviewTest JFCalibrationPerfTest RUNTIME) \ No newline at end of file + PreviewTest jfjoch_writer_test JFCalibrationPerfTest RUNTIME) diff --git a/writer/jfjoch_writer_test.cpp b/tools/jfjoch_writer_test.cpp similarity index 83% rename from writer/jfjoch_writer_test.cpp rename to tools/jfjoch_writer_test.cpp index afa6c352..6d49b28a 100644 --- a/writer/jfjoch_writer_test.cpp +++ b/tools/jfjoch_writer_test.cpp @@ -2,27 +2,28 @@ #include #include -#include "HDF5Objects.h" +#include "../writer/HDF5Objects.h" #include "../common/Logger.h" #include "../common/FrameTransformation.h" #include "../common/RawToConvertedGeometry.h" #include "../common/ZMQImagePusher.h" -#include "../grpc/JFJochWriterGroupClient.h" #define BASE_TCP_PORT 8000 int main(int argc, char **argv) { + Logger logger("jfjoch_writer_test"); RegisterHDF5Filter(); - if (argc < 4) { - std::cout << "Usage: ./jfjoch_writer_test <#images> " << std::endl; + if (argc != 4) { + std::cout << "Usage: ./jfjoch_writer_test <#images> <# of sockets>" << std::endl; std::cout << std::endl; exit(EXIT_FAILURE); } int64_t nimages_out = atoi(argv[2]); + int64_t nsockets = atoi(argv[3]); DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36)); x.Summation(1).ImagesPerTrigger(nimages_out).Mode(DetectorMode::Conversion); @@ -49,14 +50,11 @@ int main(int argc, char **argv) { ZMQContext context; context.NumThreads(4); - JFJochWriterGroupClient client; - std::vector zmq_addr; - for (int i = 3; i < argc; i++) { + for (int i = 0; i < nsockets; i++) zmq_addr.emplace_back("tcp://0.0.0.0:" + std::to_string(BASE_TCP_PORT + i)); - client.AddClient(argv[i]); - } - x.DataFileCount(zmq_addr.size()); + + x.DataFileCount(nsockets); ZMQImagePusher pusher(context, zmq_addr); @@ -80,14 +78,11 @@ int main(int argc, char **argv) { for (int j = 0; j < x.GetModulesNum(); j++) transformation.ProcessModule(image_tmp_raw.data() + j * RAW_MODULE_SIZE, j, 0); - transformation.Pack(); output_size[i] = transformation.SaveCompressedImage((uint8_t *) output[i].data()); } logger.Info("Sending {} images", nimages_out); - client.Start(zmq_addr, 0); - std::vector empty_spot_vector; std::vector send_buffer(x.GetPixelsNum() * x.GetPixelDepth() * 2); JFJochFrameSerializer serializer(send_buffer.data(), send_buffer.size()); @@ -95,7 +90,9 @@ int main(int argc, char **argv) { StartMessage start_message; x.FillMessage(start_message); - pusher.StartDataCollection(start_message); + serializer.SerializeSequenceStart(start_message); + + pusher.StartDataCollection(send_buffer.data(), serializer.GetBufferSize(), x.GetDataFileCount()); for (int i = 0; i < nimages_out; i++) { DataMessage data_message; @@ -109,9 +106,4 @@ int main(int argc, char **argv) { pusher.EndDataCollection(end_message); logger.Info("Sending done"); - auto stats = client.Stop(); - logger.Info("Writing done"); - for (int i = 0; i < stats.size(); i++) - logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz", - i, stats[i].nimages(), stats[i].performance_mbs(), stats[i].performance_hz()); } diff --git a/writer/CMakeLists.txt b/writer/CMakeLists.txt index 28e1d5b5..8a358876 100644 --- a/writer/CMakeLists.txt +++ b/writer/CMakeLists.txt @@ -5,25 +5,19 @@ TARGET_LINK_LIBRARIES(HDF5Wrappers Compression ${HDF5_LIBRARIES}) TARGET_INCLUDE_DIRECTORIES(HDF5Wrappers PUBLIC ${HDF5_INCLUDE_DIRS}) ADD_EXECUTABLE(jfjoch_writer jfjoch_writer.cpp) -ADD_EXECUTABLE(jfjoch_writer_multi jfjoch_writer_multi.cpp) ADD_LIBRARY(JFJochWriter STATIC HDF5DataFile.h HDF5DataFile.cpp HDF5NXmx.cpp HDF5NXmx.h HDF5Writer.cpp HDF5Writer.h ZMQImagePuller.cpp ZMQImagePuller.h - StreamWriter.cpp StreamWriter.h - JFJochWriterService.cpp JFJochWriterService.h) + StreamWriter.cpp StreamWriter.h) -TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions JFJochProtoBuf) +TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions JFJochProtoBuf) TARGET_LINK_LIBRARIES(jfjoch_writer JFJochWriter) -TARGET_LINK_LIBRARIES(jfjoch_writer_multi JFJochWriter) -ADD_EXECUTABLE(jfjoch_writer_test jfjoch_writer_test.cpp) -TARGET_LINK_LIBRARIES(jfjoch_writer_test JFJochWriter gRPCClients CommonFunctions) - -INSTALL(TARGETS jfjoch_writer jfjoch_writer_multi jfjoch_writer_test RUNTIME) +INSTALL(TARGETS jfjoch_writer RUNTIME) ADD_EXECUTABLE(HDF5Sum HDF5Sum.cpp) TARGET_LINK_LIBRARIES(HDF5Sum HDF5Wrappers) \ No newline at end of file diff --git a/writer/JFJochWriterService.cpp b/writer/JFJochWriterService.cpp deleted file mode 100644 index 5633c325..00000000 --- a/writer/JFJochWriterService.cpp +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#include "JFJochWriterService.h" - -#include - -JFJochWriterService::JFJochWriterService(ZMQContext &in_context, Logger &in_logger) : -logger(in_logger), zmq_context(in_context) {} - -grpc::Status JFJochWriterService::Start(grpc::ServerContext *context, const JFJochProtoBuf::WriterInput *request, - JFJochProtoBuf::Empty *response) { - std::unique_lock ul(m); - try { - if (writer) - writer.reset(); - logger.Info("Starting writer"); - - writer = std::make_unique(zmq_context, logger, request->zmq_receiver_address()); - writer_future = std::async(std::launch::async, &StreamWriter::Run, writer.get()); - - logger.Info(" ... done."); - return grpc::Status::OK; - } catch (JFJochException &e) { - logger.ErrorException(e); - return {grpc::StatusCode::ABORTED, e.what()}; - } -} - -grpc::Status JFJochWriterService::Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, - JFJochProtoBuf::WriterOutput *response) { - try { - { - std::shared_lock ul(m); - logger.Info("Stopping writer"); - if (writer_future.valid()) { - auto ret = writer_future.get(); - response->set_nimages(ret.image_puller_stats.processed_images); - response->set_performance_mbs(ret.image_puller_stats.performance_MBs); - response->set_performance_hz(ret.image_puller_stats.performance_Hz); - for (const auto &f: ret.data_file_stats) { - auto *tmp = response->add_file_statistics(); - tmp->set_name(f.filename); - tmp->set_nimages(f.max_image_number + 1); - } - } else { - response->set_nimages(0); - response->set_performance_mbs(0); - response->set_performance_hz(0); - } - } - { - std::unique_lock ul(m); - if (writer) - writer.reset(); - } - logger.Info("Done"); - return grpc::Status::OK; - } catch (JFJochException &e) { - return {grpc::StatusCode::ABORTED, e.what()}; - } -} - -grpc::Status JFJochWriterService::Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, - JFJochProtoBuf::Empty *response) { - std::shared_lock ul(m); - if (writer) - writer->Abort(); - return grpc::Status::OK; -} - - -JFJochWriterService &JFJochWriterService::BaseDirectory(const std::string &input) { - logger.Info("Setting base directory to " + input); - try { - std::filesystem::current_path(input); - } catch (const std::filesystem::filesystem_error& err) { - logger.Error("Cannot set base directory: " + std::string(err.what())); - throw JFJochException(JFJochExceptionCategory::FileWriteError, - "Cannot set base directory " + std::string(err.what())); - } - return *this; -} \ No newline at end of file diff --git a/writer/JFJochWriterService.h b/writer/JFJochWriterService.h deleted file mode 100644 index 31240d0d..00000000 --- a/writer/JFJochWriterService.h +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#ifndef JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H -#define JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H - -#include -#include -#include "StreamWriter.h" - -class JFJochWriterService final : public JFJochProtoBuf::gRPC_JFJochWriter::Service { - std::shared_mutex m; - std::unique_ptr writer; - Logger &logger; - ZMQContext &zmq_context; - std::future writer_future; -public: - JFJochWriterService(ZMQContext &in_context, Logger &in_logger); - JFJochWriterService &BaseDirectory(const std::string& input); - grpc::Status Start(grpc::ServerContext *context, const JFJochProtoBuf::WriterInput *request, - JFJochProtoBuf::Empty *response) override; - grpc::Status Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, - JFJochProtoBuf::WriterOutput *response) override; - grpc::Status Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, - JFJochProtoBuf::Empty *response) override; -}; - - -#endif //JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index d4e0709d..6dbd7634 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -51,8 +51,9 @@ void StreamWriter::Abort() { image_puller.Abort(); } -StreamWriterStatistics StreamWriter::Run() { +StreamWriterStatistics StreamWriter::Run() { StreamWriterStatistics ret; + start_message = StartMessage(); StartDataCollection(); try { CollectImages(ret.data_file_stats); @@ -68,3 +69,7 @@ StreamWriterStatistics StreamWriter::Run() { ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz); return ret; } + +std::future StreamWriter::RunFuture() { + return std::async(std::launch::async, &StreamWriter::Run, this); +} \ No newline at end of file diff --git a/writer/StreamWriter.h b/writer/StreamWriter.h index 7fc18c3c..c62463bb 100644 --- a/writer/StreamWriter.h +++ b/writer/StreamWriter.h @@ -3,6 +3,8 @@ #ifndef JUNGFRAUJOCH_STREAMWRITER_H #define JUNGFRAUJOCH_STREAMWRITER_H +#include + #include "ZMQImagePuller.h" #include "HDF5DataFile.h" @@ -21,7 +23,8 @@ class StreamWriter { void EndDataCollection(); public: StreamWriter(ZMQContext& context, Logger &logger, const std::string& zmq_addr); - StreamWriterStatistics Run(); + StreamWriterStatistics Run(); + std::future RunFuture(); void Abort(); }; diff --git a/writer/jfjoch_writer.cpp b/writer/jfjoch_writer.cpp index 021e81c3..e2828df3 100644 --- a/writer/jfjoch_writer.cpp +++ b/writer/jfjoch_writer.cpp @@ -1,41 +1,23 @@ // Copyright (2019-2023) Paul Scherrer Institute -#include -#include - -#include "HDF5Objects.h" -#include "../grpc/gRPCServer_Template.h" #include "../common/Logger.h" -#include "JFJochWriterService.h" +#include "StreamWriter.h" + +volatile bool quitok = false; int main(int argc, char **argv) { RegisterHDF5Filter(); Logger logger("jfjoch_writer"); - if (argc < 3) { - logger.Error("Usage ./jfjoch_writer "); - exit(EXIT_FAILURE); - } - - std::string grpc_addr = "0.0.0.0:" + std::string(argv[2]); - - nlohmann::json input; - std::ifstream file(argv[1]); - try { - input = nlohmann::json::parse(file); - } catch (const nlohmann::json::exception &e) { - logger.Error("JSON Parsing exception: " + std::string(e.what())); + if (argc < 2) { + logger.Error("Usage ./jfjoch_writer "); exit(EXIT_FAILURE); } ZMQContext context; - JFJochWriterService service(context, logger); + StreamWriter writer(context, logger, argv[1]); - if (input.contains("base_directory")) - service.BaseDirectory(input["base_directory"]); - - auto server = gRPCServer(grpc_addr, service); - logger.Info("gRPC configuration listening on address " + grpc_addr); - server->Wait(); + while (!quitok) + writer.Run(); } \ No newline at end of file diff --git a/writer/jfjoch_writer_multi.cpp b/writer/jfjoch_writer_multi.cpp deleted file mode 100644 index 9f45b91e..00000000 --- a/writer/jfjoch_writer_multi.cpp +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#include -#include -#include - -#include "../grpc/gRPCServer_Template.h" -#include "../common/Logger.h" -#include "JFJochWriterService.h" -#include "HDF5Objects.h" - -int main(int argc, char **argv) { - RegisterHDF5Filter(); - - Logger logger("jfjoch_writer"); - - if (argc == 1) - logger.Error("Usage ./jfjoch_writer_multi "); - - std::vector grpc_addr; - std::string base_directory; - { - nlohmann::json input; - std::ifstream file(argv[1]); - try { - input = nlohmann::json::parse(file); - } catch (const nlohmann::json::exception &e) { - logger.Error("JSON Parsing exception: " + std::string(e.what())); - exit(EXIT_FAILURE); - } - if (input.contains("base_directory")) - base_directory = input["base_directory"]; - - if (!input.contains("grpc_addr")) - logger.Error("Input file needs writer services gRPC addresses"); - if (!input["grpc_addr"].is_array()) - logger.Error("grpc_addr must be array"); - - for (const auto &j: input["grpc_addr"]) { - if (j.is_number()) { - int64_t tcp_port = j.get(); - if ((tcp_port <= 0) || (tcp_port >= UINT16_MAX)) - logger.Error("tcp port {} invalid", tcp_port); - grpc_addr.push_back("0.0.0.0:" + std::to_string(tcp_port)); - } else if (j.is_string()) { - grpc_addr.push_back(j.get()); - } else - logger.Error("grpc_addr array element must be string"); - - } - } - - std::vector child_pids; - - for (const auto &addr: grpc_addr) { - Logger logger_local(addr); - - pid_t pid = fork(); - if (pid == 0) { - ZMQContext context; - JFJochWriterService service(context, logger_local); - if (!base_directory.empty()) - service.BaseDirectory(base_directory); - - auto server = gRPCServer(addr, service); - logger_local.Info("gRPC configuration listening on address " + addr); - server->Wait(); - } else - child_pids.push_back(pid); - } - int status; - for (const auto &p: child_pids) - wait(&status); -} \ No newline at end of file