diff --git a/acquisition_device/AcquisitionDevice.cpp b/acquisition_device/AcquisitionDevice.cpp index ecc750de..55fb656d 100644 --- a/acquisition_device/AcquisitionDevice.cpp +++ b/acquisition_device/AcquisitionDevice.cpp @@ -226,3 +226,11 @@ AcquisitionDeviceStatistics AcquisitionDevice::GetStatistics() const { return ret; } + +void AcquisitionDevice::SetIPv4Address(uint32_t ipv4_addr_network_order) { + ipv4_addr = ipv4_addr_network_order; +} + +void AcquisitionDevice::SetMACAddress(uint64_t mac_addr_network_order) { + mac_addr = mac_addr_network_order; +} \ No newline at end of file diff --git a/acquisition_device/AcquisitionDevice.h b/acquisition_device/AcquisitionDevice.h index 664d6290..a6f8aca1 100644 --- a/acquisition_device/AcquisitionDevice.h +++ b/acquisition_device/AcquisitionDevice.h @@ -124,6 +124,8 @@ public: virtual void SetSpotFinderParameters(int16_t count_threshold, double snr_threshold) {}; virtual std::string GetIPv4Address() const; + virtual void SetIPv4Address(uint32_t ipv4_addr_network_order); + virtual void SetMACAddress(uint64_t mac_addr_network_order); virtual std::string GetMACAddress() const; virtual uint16_t GetUDPPort() const; virtual int32_t GetNUMANode() const; @@ -131,6 +133,8 @@ public: virtual std::vector GetInternalGeneratorFrame() const { return {}; } + + }; diff --git a/acquisition_device/AcquisitionDeviceGroup.cpp b/acquisition_device/AcquisitionDeviceGroup.cpp new file mode 100644 index 00000000..db27cb4d --- /dev/null +++ b/acquisition_device/AcquisitionDeviceGroup.cpp @@ -0,0 +1,34 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#include "AcquisitionDeviceGroup.h" +#include "PCIExpressDevice.h" +#include "MockAcquisitionDevice.h" +#include "HLSSimulatedDevice.h" + +AcquisitionDevice &AcquisitionDeviceGroup::operator[](int idx) { + if ((idx < 0) || (idx >= aq_devices.size())) + throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Device out of bounds"); + if (aq_devices[idx] == nullptr) + throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Device not initialized"); + return *aq_devices[idx]; +} + +size_t AcquisitionDeviceGroup::size() { + return aq_devices.size(); +} + +void AcquisitionDeviceGroup::Add(std::unique_ptr &&device) { + aq_devices.emplace_back(std::move(device)); +} + +void AcquisitionDeviceGroup::AddPCIeDevice(const std::string &device_name) { + aq_devices.emplace_back(std::make_unique(aq_devices.size(), device_name)); +} + +void AcquisitionDeviceGroup::AddHLSDevice(int64_t buffer_size_modules) { + aq_devices.emplace_back(std::make_unique(aq_devices.size(), buffer_size_modules)); +} + +void AcquisitionDeviceGroup::AddMockDevice(int64_t buffer_size_modules) { + aq_devices.emplace_back(std::make_unique(aq_devices.size(), buffer_size_modules)); +} diff --git a/acquisition_device/AcquisitionDeviceGroup.h b/acquisition_device/AcquisitionDeviceGroup.h new file mode 100644 index 00000000..e93ca251 --- /dev/null +++ b/acquisition_device/AcquisitionDeviceGroup.h @@ -0,0 +1,21 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#ifndef JUNGFRAUJOCH_ACQUISITIONDEVICEGROUP_H +#define JUNGFRAUJOCH_ACQUISITIONDEVICEGROUP_H + +#include +#include "AcquisitionDevice.h" +#include "../common/JFJochException.h" + +class AcquisitionDeviceGroup { + std::vector> aq_devices; +public: + AcquisitionDevice& operator[](int idx); + size_t size(); + void Add(std::unique_ptr &&device); + void AddPCIeDevice(const std::string &device_name); + void AddHLSDevice(int64_t buffer_size_modules); + void AddMockDevice(int64_t buffer_size_modules); +}; + +#endif //JUNGFRAUJOCH_ACQUISITIONDEVICEGROUP_H diff --git a/acquisition_device/CMakeLists.txt b/acquisition_device/CMakeLists.txt index 5849a0b1..43b66255 100644 --- a/acquisition_device/CMakeLists.txt +++ b/acquisition_device/CMakeLists.txt @@ -5,6 +5,8 @@ ADD_LIBRARY(JungfraujochAcqusitionDevice STATIC HLSSimulatedDevice.cpp HLSSimulatedDevice.h Completion.cpp Completion.h ../fpga/pcie_driver/ActionConfig.h PCIExpressDevice.cpp PCIExpressDevice.h - FPGAAcquisitionDevice.cpp FPGAAcquisitionDevice.h) + FPGAAcquisitionDevice.cpp FPGAAcquisitionDevice.h + AcquisitionDeviceGroup.cpp + AcquisitionDeviceGroup.h) TARGET_LINK_LIBRARIES(JungfraujochAcqusitionDevice JungfraujochDevice CommonFunctions HLSSimulation JFCalibration) \ No newline at end of file diff --git a/acquisition_device/FPGAAcquisitionDevice.cpp b/acquisition_device/FPGAAcquisitionDevice.cpp index c4e684d9..096883c5 100644 --- a/acquisition_device/FPGAAcquisitionDevice.cpp +++ b/acquisition_device/FPGAAcquisitionDevice.cpp @@ -244,12 +244,6 @@ void FPGAAcquisitionDevice::Start(const DiffractionExperiment &experiment, uint3 read_work_completion_future = std::async(std::launch::async, &FPGAAcquisitionDevice::ReadWorkCompletionThread, this); } -DataCollectionConfig FPGAAcquisitionDevice::ReadActionRegister() { - DataCollectionConfig cfg{}; - HW_ReadActionRegister(&cfg); - return cfg; -} - std::vector FPGAAcquisitionDevice::GetInternalGeneratorFrame() const { return internal_pkt_gen_frame; } diff --git a/acquisition_device/FPGAAcquisitionDevice.h b/acquisition_device/FPGAAcquisitionDevice.h index 337af6b0..34cdc4c3 100644 --- a/acquisition_device/FPGAAcquisitionDevice.h +++ b/acquisition_device/FPGAAcquisitionDevice.h @@ -42,7 +42,6 @@ protected: memset(status, 0, sizeof(DeviceStatus)); } public: - DataCollectionConfig ReadActionRegister(); void InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) override; void InitializeIntegrationMap(const DiffractionExperiment &experiment, const std::vector &v) override; void InitializeIntegrationMap(const DiffractionExperiment &experiment, const std::vector &v, diff --git a/acquisition_device/PCIExpressDevice.cpp b/acquisition_device/PCIExpressDevice.cpp index 4ec8b979..675ff3b1 100644 --- a/acquisition_device/PCIExpressDevice.cpp +++ b/acquisition_device/PCIExpressDevice.cpp @@ -4,12 +4,12 @@ #include "../common/NetworkAddressConvert.h" PCIExpressDevice::PCIExpressDevice(uint16_t data_stream, uint16_t pci_slot) : - PCIExpressDevice("/dev/jfjoch" + std::to_string(pci_slot), data_stream) {} + PCIExpressDevice(data_stream, "/dev/jfjoch" + std::to_string(pci_slot)) {} PCIExpressDevice::PCIExpressDevice(uint16_t data_stream) : -PCIExpressDevice("/dev/jfjoch" + std::to_string(data_stream), data_stream) {} +PCIExpressDevice(data_stream, "/dev/jfjoch" + std::to_string(data_stream)) {} -PCIExpressDevice::PCIExpressDevice(const std::string &device_name, uint16_t data_stream) +PCIExpressDevice::PCIExpressDevice(uint16_t data_stream, const std::string &device_name) : FPGAAcquisitionDevice(data_stream), dev(device_name, true) { DataCollectionStatus status = dev.GetDataCollectionStatus(); diff --git a/acquisition_device/PCIExpressDevice.h b/acquisition_device/PCIExpressDevice.h index d7bd5eb5..c97b7bc4 100644 --- a/acquisition_device/PCIExpressDevice.h +++ b/acquisition_device/PCIExpressDevice.h @@ -20,21 +20,21 @@ class PCIExpressDevice : public FPGAAcquisitionDevice { void HW_LoadInternalGeneratorFrame(uint32_t modules) override; void HW_SetSpotFinderParameters(const SpotFinderParameters ¶ms) override; void FPGA_EndAction() override; + uint32_t GetNumKernelBuffers() const; public: explicit PCIExpressDevice(uint16_t data_stream); PCIExpressDevice(uint16_t data_stream, uint16_t pci_slot); - PCIExpressDevice(const std::string &device_name, uint16_t data_stream); + PCIExpressDevice(uint16_t data_stream, const std::string &device_name); void Cancel() override; void HW_GetStatus(DataCollectionStatus *status) const override; void HW_GetEnvParams(DeviceStatus *status) const override; - uint32_t GetNumKernelBuffers() const; int32_t GetNUMANode() const override; - void SetMACAddress(uint64_t mac_addr_network_order); + void SetMACAddress(uint64_t mac_addr_network_order) override; void SetDefaultMAC(); - void SetIPv4Address(uint32_t ipv4_addr_network_order); + void SetIPv4Address(uint32_t ipv4_addr_network_order) override; std::string GetMACAddress() const override; std::string GetIPv4Address() const override; diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index c15cc8cc..044c35eb 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -21,7 +21,7 @@ inline std::string time_UTC(const std::chrono::time_point &in_aq_device, + AcquisitionDeviceGroup &in_aq_device, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, int64_t in_send_buffer_count, @@ -84,7 +84,7 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, } for (int d = 0; d < ndatastreams; d++) { - acquisition_device[d]->PrepareAction(experiment); + acquisition_device[d].PrepareAction(experiment); logger.Debug("Acquisition device {} prepared", d); } @@ -235,22 +235,22 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, void JFJochReceiver::AcquireThread(uint16_t data_stream) { try { - NUMAHWPolicy::RunOnNode(acquisition_device[data_stream]->GetNUMANode()); + NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } try { if (calibration != nullptr) - acquisition_device[data_stream]->InitializeCalibration(experiment, *calibration); + acquisition_device[data_stream].InitializeCalibration(experiment, *calibration); if (rad_int_mapping) - acquisition_device[data_stream]->InitializeIntegrationMap(experiment, + acquisition_device[data_stream].InitializeIntegrationMap(experiment, rad_int_mapping->GetPixelToBinMappingRaw(), rad_int_corr_raw); frame_transformation_ready.wait(); logger.Debug("Device thread {} start FPGA action", data_stream); - acquisition_device[data_stream]->StartAction(experiment); + acquisition_device[data_stream].StartAction(experiment); } catch (const JFJochException &e) { Cancel(e); data_acquisition_ready.count_down(); @@ -260,7 +260,7 @@ void JFJochReceiver::AcquireThread(uint16_t data_stream) { try { logger.Debug("Device thread {} wait for FPGA action complete", data_stream); - acquisition_device[data_stream]->WaitForActionComplete(); + acquisition_device[data_stream].WaitForActionComplete(); } catch (const JFJochException &e) { Cancel(e); } @@ -269,7 +269,7 @@ void JFJochReceiver::AcquireThread(uint16_t data_stream) { void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) { try { - NUMAHWPolicy::RunOnNode(acquisition_device[data_stream]->GetNUMANode()); + NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } @@ -298,19 +298,19 @@ void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module try { for (size_t frame = staring_frame; frame < experiment.GetFrameNum(); frame += frame_stride) { // Frame will be processed only if one already collects frame+2 - acquisition_device[data_stream]->Counters().WaitForFrame(frame + 2, module_number); + acquisition_device[data_stream].Counters().WaitForFrame(frame + 2, module_number); if (!storage_cell_G1G2 || (frame % 2 == 1)) { // Partial packets will bring more problems, than benefit - if (acquisition_device[data_stream]->Counters().IsFullModuleCollected(frame, module_number)) { - pedestal_calc.AnalyzeImage((uint16_t *) acquisition_device[data_stream]->GetDeviceOutput(frame, module_number)->pixels); + if (acquisition_device[data_stream].Counters().IsFullModuleCollected(frame, module_number)) { + pedestal_calc.AnalyzeImage((uint16_t *) acquisition_device[data_stream].GetDeviceOutput(frame, module_number)->pixels); } - auto tmp = acquisition_device[data_stream]->Counters().GetCompletion(frame, module_number).debug; + auto tmp = acquisition_device[data_stream].Counters().GetCompletion(frame, module_number).debug; storage_cell_header = (tmp >> 8) & 0xF; } - acquisition_device[data_stream]->FrameBufferRelease(frame, module_number); + acquisition_device[data_stream].FrameBufferRelease(frame, module_number); - UpdateMaxDelay(acquisition_device[data_stream]->Counters().CalculateDelay(frame, module_number)); + UpdateMaxDelay(acquisition_device[data_stream].Counters().CalculateDelay(frame, module_number)); UpdateMaxImage(frame); } @@ -351,9 +351,9 @@ void JFJochReceiver::FrameTransformationThread() { while (images_to_go.Get(image_number) != 0) { try { // If data acquisition is finished and fastest frame for the first device is behind - acquisition_device[0]->Counters().WaitForFrame(image_number + 1); - if (acquisition_device[0]->Counters().IsAcquisitionFinished() && - (acquisition_device[0]->Counters().GetFastestFrameNumber() < image_number)) + acquisition_device[0].Counters().WaitForFrame(image_number + 1); + if (acquisition_device[0].Counters().IsAcquisitionFinished() && + (acquisition_device[0].Counters().GetFastestFrameNumber() < image_number)) continue; DataMessage message{}; @@ -374,13 +374,13 @@ void JFJochReceiver::FrameTransformationThread() { for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { - acquisition_device[d]->Counters().WaitForFrame(image_number + 1, m); + acquisition_device[d].Counters().WaitForFrame(image_number + 1, m); const int16_t *src; - if (acquisition_device[d]->Counters().IsAnyPacketCollected(image_number, m)) { - src = acquisition_device[d]->GetDeviceOutput(image_number, m)->pixels; + if (acquisition_device[d].Counters().IsAnyPacketCollected(image_number, m)) { + src = acquisition_device[d].GetDeviceOutput(image_number, m)->pixels; if (!send_image) { - Completion c = acquisition_device[d]->Counters().GetCompletion(image_number, m); + Completion c = acquisition_device[d].Counters().GetCompletion(image_number, m); // the information is for first module/frame that was collected in full message.bunch_id = c.bunchid; message.jf_info = c.debug; @@ -391,21 +391,21 @@ void JFJochReceiver::FrameTransformationThread() { send_image = true; size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; - adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(image_number, m)); - adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(image_number, m)); + adu_histogram_module[module_abs_number].Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); + adu_histogram_total.Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); if (rad_int_profile_image) - rad_int_profile_image->Add(*acquisition_device[d]->GetDeviceOutput(image_number, m)); + rad_int_profile_image->Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); if (find_spots) - strong_pixel_set.ReadFPGAOutput(experiment, *acquisition_device[d]->GetDeviceOutput(image_number, m), m); + strong_pixel_set.ReadFPGAOutput(experiment, *acquisition_device[d].GetDeviceOutput(image_number, m), m); } else - src = acquisition_device[d]->GetErrorFrameBuffer(); + src = acquisition_device[d].GetErrorFrameBuffer(); transformation.ProcessModule(src, m, d); - acquisition_device[d]->FrameBufferRelease(image_number, m); + acquisition_device[d].FrameBufferRelease(image_number, m); } - auto delay = acquisition_device[d]->Counters().CalculateDelay(image_number); + auto delay = acquisition_device[d].Counters().CalculateDelay(image_number); UpdateMaxDelay(delay); if (delay > message.receiver_aq_dev_delay) message.receiver_aq_dev_delay = delay; @@ -496,8 +496,8 @@ float JFJochReceiver::GetEfficiency() const { uint64_t received_packets = 0; for (int d = 0; d < ndatastreams; d++) { - expected_packets += acquisition_device[d]->Counters().GetExpectedPackets(); - received_packets += acquisition_device[d]->Counters().GetTotalPackets(); + expected_packets += acquisition_device[d].Counters().GetExpectedPackets(); + received_packets += acquisition_device[d].Counters().GetTotalPackets(); } if ((expected_packets == received_packets) || (expected_packets == 0)) @@ -510,9 +510,9 @@ JFJochReceiverOutput JFJochReceiver::GetStatistics() const { JFJochReceiverOutput ret; for (int d = 0; d < ndatastreams; d++) { - for (int m = 0; m < acquisition_device[d]->Counters().GetModuleNumber(); m++) { - ret.expected_packets.push_back(acquisition_device[d]->Counters().GetExpectedPacketsPerModule()); - ret.received_packets.push_back(acquisition_device[d]->Counters().GetTotalPackets(m)); + for (int m = 0; m < acquisition_device[d].Counters().GetModuleNumber(); m++) { + ret.expected_packets.push_back(acquisition_device[d].Counters().GetExpectedPacketsPerModule()); + ret.received_packets.push_back(acquisition_device[d].Counters().GetTotalPackets(m)); } } ret.efficiency = GetEfficiency(); @@ -559,7 +559,7 @@ void JFJochReceiver::Cancel() { cancelled = true; for (int d = 0; d < ndatastreams; d++) - acquisition_device[d]->Cancel(); + acquisition_device[d].Cancel(); } void JFJochReceiver::Cancel(const JFJochException &e) { @@ -569,7 +569,7 @@ void JFJochReceiver::Cancel(const JFJochException &e) { cancelled = true; for (int d = 0; d < ndatastreams; d++) - acquisition_device[d]->Cancel(); + acquisition_device[d].Cancel(); } float JFJochReceiver::GetIndexingRate() const { @@ -605,20 +605,18 @@ void JFJochReceiver::FinalizeMeasurement() { if (rad_int_profile) message.rad_int_result["dataset"] = rad_int_profile->GetResult(); for (int i = 0; i < rad_int_profile_per_file.size(); i++) - message.rad_int_result["file" + std::to_string(i)] - = rad_int_profile_per_file[i]->GetResult(); + message.rad_int_result["file" + std::to_string(i)] = rad_int_profile_per_file[i]->GetResult(); message.adu_histogram["total"] = adu_histogram_total.GetHistogram(); for (int i = 0; i < adu_histogram_module.size(); i++) - message.adu_histogram["module" + std::to_string(i)] - = adu_histogram_module[i].GetHistogram(); + message.adu_histogram["module" + std::to_string(i)] = adu_histogram_module[i].GetHistogram(); image_pusher.EndDataCollection(message); logger.Info("Disconnected from writers"); } for (int d = 0; d < ndatastreams; d++) - acquisition_device[d]->Cancel(); + acquisition_device[d].Cancel(); end_time = std::chrono::system_clock::now(); @@ -637,7 +635,7 @@ void JFJochReceiver::SetDataProcessingSettings(const DataProcessingSettings &in_ DiffractionExperiment::CheckDataProcessingSettings(in_data_processing_settings); data_processing_settings = in_data_processing_settings; for (int i = 0; i < ndatastreams; i++) { - acquisition_device[i]->SetSpotFinderParameters(data_processing_settings.photon_count_threshold, + acquisition_device[i].SetSpotFinderParameters(data_processing_settings.photon_count_threshold, data_processing_settings.signal_to_noise_threshold); } } diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index da90ea5c..455adebc 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -10,7 +10,7 @@ #include #include -#include "../acquisition_device/AcquisitionDevice.h" +#include "../acquisition_device/AcquisitionDeviceGroup.h" #include "../common/DiffractionExperiment.h" #include "../common/JFJochException.h" @@ -80,7 +80,7 @@ class JFJochReceiver { volatile bool cancelled{false}; - std::vector &acquisition_device; + AcquisitionDeviceGroup &acquisition_device; uint16_t ndatastreams{0}; int64_t max_delay = 0; @@ -133,7 +133,7 @@ class JFJochReceiver { public: JFJochReceiver(const DiffractionExperiment& experiment, const JFCalibration *calibration, - std::vector &open_capi_device, + AcquisitionDeviceGroup &acquisition_devices, ImagePusher &image_pusher, Logger &logger, int64_t forward_and_sum_nthreads, int64_t send_buffer_count, diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index e474795e..3c2ca07a 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -60,9 +60,9 @@ void Convert(const RadialIntegrationProfiles& input, JFJochProtoBuf::RadialInteg } } -JFJochReceiverService::JFJochReceiverService(std::vector &open_capi_device, +JFJochReceiverService::JFJochReceiverService(AcquisitionDeviceGroup &in_aq_devices, Logger &in_logger, ImagePusher &pusher) : - logger(in_logger), aq_devices(open_capi_device), + logger(in_logger), aq_devices(in_aq_devices), image_pusher(pusher), data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) { } @@ -230,11 +230,11 @@ grpc::Status JFJochReceiverService::SetDataProcessingSettings(grpc::ServerContex grpc::Status JFJochReceiverService::GetNetworkConfig(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverNetworkConfig *response) { - for (const auto &aq: aq_devices) { + for (int i = 0; i < aq_devices.size(); i++) { auto dev_net_cfg = response->add_device(); - dev_net_cfg->set_mac_addr(aq->GetMACAddress()); - dev_net_cfg->set_ipv4_addr(aq->GetIPv4Address()); - dev_net_cfg->set_udp_port(aq->GetUDPPort()); + dev_net_cfg->set_mac_addr(aq_devices[i].GetMACAddress()); + dev_net_cfg->set_ipv4_addr(aq_devices[i].GetIPv4Address()); + dev_net_cfg->set_udp_port(aq_devices[i].GetUDPPort()); } return grpc::Status::OK; } diff --git a/receiver/JFJochReceiverService.h b/receiver/JFJochReceiverService.h index 338fe52d..23b1063c 100644 --- a/receiver/JFJochReceiverService.h +++ b/receiver/JFJochReceiverService.h @@ -13,7 +13,7 @@ class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver::Service { NUMAHWPolicy numa_policy; std::unique_ptr receiver; - std::vector &aq_devices; + AcquisitionDeviceGroup &aq_devices; std::unique_ptr calibration; std::unique_ptr experiment; Logger &logger; @@ -32,7 +32,7 @@ class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver:: void FinalizeMeasurement(); DataProcessingSettings data_processing_settings; public: - JFJochReceiverService(std::vector &open_capi_device, + JFJochReceiverService(AcquisitionDeviceGroup &aq_devices, Logger &logger, ImagePusher &pusher); JFJochReceiverService& PreviewPublisher(ZMQPreviewPublisher *in_preview_writer); JFJochReceiverService& NumThreads(int64_t input); diff --git a/receiver/JFJochReceiverTest.cpp b/receiver/JFJochReceiverTest.cpp index a7557180..0b739dd9 100644 --- a/receiver/JFJochReceiverTest.cpp +++ b/receiver/JFJochReceiverTest.cpp @@ -7,7 +7,7 @@ #define STORAGE_CELL_FOR_TEST 11 -JFJochReceiverOutput RunJFJochReceiverTest(std::vector &aq_devices, ImagePusher &pusher, +JFJochReceiverOutput RunJFJochReceiverTest(AcquisitionDeviceGroup &aq_devices, ImagePusher &pusher, const DiffractionExperiment &x, Logger &logger, JFCalibration &calib, uint16_t nthreads, bool abort, @@ -41,19 +41,6 @@ JFJochReceiverOutput RunJFJochReceiverTest(std::vector &aq_ return service.Stop(); } -bool JFJochReceiverTest(JFJochReceiverOutput &output, Logger &logger, - std::vector> &aq_devices, const DiffractionExperiment &x, - uint16_t nthreads, bool abort, - bool verbose, ZMQPreviewPublisher *in_preview_writer, - const std::string &numa_policy) { - std::vector tmp_aq_devices; - for (const auto &i: aq_devices) - tmp_aq_devices.emplace_back(i.get()); - - return JFJochReceiverTest(output, logger, tmp_aq_devices, x, nthreads, - abort, verbose, in_preview_writer, numa_policy); -} - static JFCalibration GeneratePedestalCalibration(const DiffractionExperiment &x) { JFCalibration ret(x); @@ -77,15 +64,15 @@ static JFCalibration GeneratePedestalCalibration(const DiffractionExperiment &x) } bool JFJochReceiverTest(JFJochReceiverOutput &output, Logger &logger, - std::vector &aq_devices, const DiffractionExperiment &x, + AcquisitionDeviceGroup &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort, - bool verbose, ZMQPreviewPublisher *in_preview_writer, + ZMQPreviewPublisher *in_preview_writer, const std::string &numa_policy) { std::vector raw_expected_image(RAW_MODULE_SIZE * x.GetModulesNum()); for (int i = 0; i < x.GetDataStreamsNum(); i++) { uint32_t module0 = x.GetFirstModuleOfDataStream(i); - auto int_gen_frame = aq_devices[i]->GetInternalGeneratorFrame(); + auto int_gen_frame = aq_devices[i].GetInternalGeneratorFrame(); if (int_gen_frame.size() < x.GetModulesNum(i) * RAW_MODULE_SIZE) { logger.Error("Wrong internal generator frame size"); return false; diff --git a/receiver/JFJochReceiverTest.h b/receiver/JFJochReceiverTest.h index 83690781..97224e0c 100644 --- a/receiver/JFJochReceiverTest.h +++ b/receiver/JFJochReceiverTest.h @@ -5,22 +5,16 @@ #ifndef JUNGFRAUJOCH_JFJOCHRECEIVERTEST_H #define JUNGFRAUJOCH_JFJOCHRECEIVERTEST_H -JFJochReceiverOutput RunJFJochReceiverTest(std::vector &aq_devices, ImagePusher &pusher, - const DiffractionExperiment &x, Logger &logger, JFCalibration &calib, - uint16_t nthreads, bool abort = false, - ZMQPreviewPublisher *in_preview_writer = nullptr, - const std::string &numa_policy = ""); +JFJochReceiverOutput RunJFJochReceiverTest(AcquisitionDeviceGroup &aq_devices, ImagePusher &pusher, + const DiffractionExperiment &x, Logger &logger, JFCalibration &calib, + uint16_t nthreads, bool abort = false, + ZMQPreviewPublisher *in_preview_writer = nullptr, + const std::string &numa_policy = ""); bool JFJochReceiverTest(JFJochReceiverOutput &output, Logger &logger, - std::vector &aq_devices, const DiffractionExperiment &x, + AcquisitionDeviceGroup &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort = false, - bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr, - const std::string &numa_policy = ""); - -bool JFJochReceiverTest(JFJochReceiverOutput &output, Logger &logger, - std::vector> &aq_devices, const DiffractionExperiment &x, - uint16_t nthreads, bool abort = false, - bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr, + ZMQPreviewPublisher *in_preview_writer = nullptr, const std::string &numa_policy = ""); #endif //JUNGFRAUJOCH_JFJOCHRECEIVERTEST_H diff --git a/receiver/jfjoch_action_test.cpp b/receiver/jfjoch_action_test.cpp index 6ab7e5f1..04bbbd26 100644 --- a/receiver/jfjoch_action_test.cpp +++ b/receiver/jfjoch_action_test.cpp @@ -114,9 +114,7 @@ int main(int argc, char **argv) { logger.Verbose(verbose); - std::vector> pcie_devices; - std::vector> mock_devices; - std::vector aq_devices; + AcquisitionDeviceGroup aq_devices; std::string image_path = std::string(argv[optind]) + "/tests/test_data/mod5_raw0.bin"; std::vector input(RAW_MODULE_SIZE, 0); @@ -138,10 +136,10 @@ int main(int argc, char **argv) { if (numa_node != -1) logger.Info("Pinning stream {} to NUMA node {}", i, numa_node); - mock_devices.push_back(std::make_unique(i, 1024, numa_node)); - mock_devices[i]->SetCustomInternalGeneratorFrame(input); - mock_devices[i]->EnableLogging(&logger); - aq_devices.push_back(mock_devices[i].get()); + auto tmp = std::make_unique(i, 1024, numa_node); + tmp->SetCustomInternalGeneratorFrame(input); + tmp->EnableLogging(&logger); + aq_devices.Add(std::move(tmp)); } } else { @@ -151,12 +149,12 @@ int main(int argc, char **argv) { } for (int i = 0; i < nstreams; i++) { - pcie_devices.push_back(std::make_unique(dev_name[i], i)); - pcie_devices[i]->SetInternalGeneratorFrameForAllModules(input); - pcie_devices[i]->EnableLogging(&logger); - pcie_devices[i]->SetDefaultMAC(); - pcie_devices[i]->SetIPv4Address((i << 24) + 0x010a0a0a); - aq_devices.push_back(pcie_devices[i].get()); + auto tmp = std::make_unique(i, dev_name[i]); + tmp->SetInternalGeneratorFrameForAllModules(input); + tmp->EnableLogging(&logger); + tmp->SetDefaultMAC(); + tmp->SetIPv4Address((i << 24) + 0x010a0a0a); + aq_devices.Add(std::move(tmp)); } } @@ -165,7 +163,7 @@ int main(int argc, char **argv) { bool ret; std::thread run_thread([&] { try { - ret = JFJochReceiverTest(output, logger, aq_devices, x, nthreads, false, verbose, nullptr, numa_policy_name); + ret = JFJochReceiverTest(output, logger, aq_devices, x, nthreads, false, nullptr, numa_policy_name); } catch (std::exception &e) { logger.Error(e.what()); ret = false; @@ -176,12 +174,12 @@ int main(int argc, char **argv) { if (!use_mock_device) { while (!done) { for (int i = 0; i < nstreams; i++) { - auto coll_status = pcie_devices[i]->GetDataCollectionStatus(); - auto dev_status = pcie_devices[i]->GetDeviceStatus(); + auto coll_status = aq_devices[i].GetDataCollectionStatus(); + auto dev_status = aq_devices[i].GetDeviceStatus(); double power = (dev_status.fpga_pcie_12V_I_mA * dev_status.fpga_pcie_12V_V_mV + dev_status.fpga_pcie_3p3V_I_mA * dev_status.fpga_pcie_3p3V_V_mV) / (1000.0 * 1000.0); logger.Info("Device {}: Slowest packet: {:8d} Power: {:5.1f} W FPGA Temp: {:d} degC HBM Temp: {:d}/{:d} degC Stalls: {:15d}/{:15d}", - i, pcie_devices[i]->Counters().GetSlowestFrameNumber(), power, + i, aq_devices[i].Counters().GetSlowestFrameNumber(), power, dev_status.fpga_temp_C, dev_status.hbm_0_temp_C, dev_status.hbm_1_temp_C, coll_status.pipeline_stalls_hbm, coll_status.pipeline_stalls_host); } @@ -204,7 +202,7 @@ int main(int argc, char **argv) { if (!use_mock_device) { logger.Info(""); for (int i = 0; i < nstreams; i++) { - auto coll_status = pcie_devices[i]->GetDataCollectionStatus(); + auto coll_status = aq_devices[i].GetDataCollectionStatus(); auto stalls_hbm = coll_status.pipeline_stalls_hbm; auto stalls_host = coll_status.pipeline_stalls_host; diff --git a/receiver/jfjoch_receiver.cpp b/receiver/jfjoch_receiver.cpp index 94523a05..039e4342 100644 --- a/receiver/jfjoch_receiver.cpp +++ b/receiver/jfjoch_receiver.cpp @@ -6,13 +6,12 @@ #include "../grpc/gRPCServer_Template.h" #include "../common/ZMQImagePusher.h" #include "../acquisition_device/PCIExpressDevice.h" +#include "../acquisition_device/HLSSimulatedDevice.h" #include "JFJochReceiverService.h" -#include "../acquisition_device/HLSSimulatedDevice.h" #include "../common/NetworkAddressConvert.h" -AcquisitionDevice *SetupAcquisitionDevice(const nlohmann::json &input, uint16_t data_stream) { - AcquisitionDevice *ret; +std::unique_ptr SetupAcquisitionDevice(const nlohmann::json &input, uint16_t data_stream) { int16_t numa_node = -1; if (input.contains("numa_node")) @@ -29,9 +28,9 @@ AcquisitionDevice *SetupAcquisitionDevice(const nlohmann::json &input, uint16_t uint32_t ipv4_addr = IPv4AddressFromStr(input["ipv4_addr"].get()); if (input.contains("type") && (input["type"] == "software")) - ret = new HLSSimulatedDevice(data_stream, frame_buffer_size, numa_node); + return std::make_unique(data_stream, frame_buffer_size, numa_node); else if (input.contains("type") && (input["type"] == "pcie")) { - auto pci_dev = new PCIExpressDevice(data_stream, pci_slot); + auto pci_dev = std::make_unique(data_stream, pci_slot); pci_dev->SetIPv4Address(ipv4_addr); if (input.contains("mac_addr")) @@ -46,11 +45,9 @@ AcquisitionDevice *SetupAcquisitionDevice(const nlohmann::json &input, uint16_t file.read((char *) tmp.data(), RAW_MODULE_SIZE * sizeof(uint16_t)); pci_dev->SetInternalGeneratorFrameForAllModules(tmp); } - ret = pci_dev; + return pci_dev; } else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Unsupported device type"); - - return ret; } int main(int argc, char **argv) { @@ -94,18 +91,11 @@ int main(int argc, char **argv) { logger.Verbose(verbose); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < input["device"].size(); i++) { auto ptr = SetupAcquisitionDevice(input["device"][i], i); ptr->EnableLogging(&logger); - aq_devices.emplace_back(ptr); - } - - std::vector aq_devices_ptr; - for (const auto &i: aq_devices) { - if (verbose) - i->EnableLogging(&logger); - aq_devices_ptr.push_back(i.get()); + aq_devices.Add(std::move(ptr)); } logger.Info("Enabled acquisition device count: " + std::to_string(aq_devices.size())); @@ -126,7 +116,7 @@ int main(int argc, char **argv) { ZMQImagePusher pusher(zmq_addr, send_buffer_high_watermark, send_buffer_size); - JFJochReceiverService service(aq_devices_ptr, logger, pusher); + JFJochReceiverService service(aq_devices, logger, pusher); std::unique_ptr preview, preview_indexed; if (input.contains("preview_zmq_addr")) { diff --git a/tests/JFJochFullIntegrationTest.cpp b/tests/JFJochFullIntegrationTest.cpp index 683a1769..b59e1c50 100644 --- a/tests/JFJochFullIntegrationTest.cpp +++ b/tests/JFJochFullIntegrationTest.cpp @@ -5,13 +5,10 @@ #include "../grpc/gRPCServer_Template.h" #include "../broker/JFJochStateMachine.h" #include "../writer/JFJochWriterService.h" -#include "../writer/HDF5Objects.h" #include "../receiver/JFJochReceiverService.h" #include "FPGAUnitTest.h" #include "../acquisition_device/MockAcquisitionDevice.h" -#include "../acquisition_device/HLSSimulatedDevice.h" #include "../common/ZMQImagePusher.h" -#include "../common/jsonToGrpc.h" using namespace std::literals::chrono_literals; @@ -39,19 +36,15 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); - aq_devices.emplace_back(test); + auto test = std::make_unique(i, 256); + aq_devices.Add(std::move(test)); } - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); @@ -80,9 +73,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") { for (int i = 0; i < ndatastream; i++) { for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) - aq_devices[i]->AddModule(image_num, m, image.data()); + dynamic_cast(aq_devices[i]).AddModule(image_num, m, image.data()); } - aq_devices[i]->Terminate(); + dynamic_cast(aq_devices[i]).Terminate(); } REQUIRE_NOTHROW(state_machine.Stop()); @@ -134,19 +127,15 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); - aq_devices.emplace_back(test); + auto test = std::make_unique(i, 256); + aq_devices.Add(std::move(test)); } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - + ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); @@ -176,9 +165,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") { for (int i = 0; i < ndatastream; i++) { for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) - aq_devices[i]->AddModule(image_num, m, image.data()); + dynamic_cast(aq_devices[i]).AddModule(image_num, m, image.data()); } - aq_devices[i]->Terminate(); + dynamic_cast(aq_devices[i]).Terminate(); } REQUIRE_NOTHROW(state_machine.Stop()); @@ -230,19 +219,15 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < 4; i++) { - auto *test = new MockAcquisitionDevice(i, 256); - aq_devices.emplace_back(test); + auto test = std::make_unique(i, 256); + aq_devices.Add(std::move(test)); } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - + ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); @@ -271,9 +256,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") for (int i = 0; i < ndatastream; i++) { for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) - aq_devices[i]->AddModule(image_num, m, image.data()); + dynamic_cast(aq_devices[i]).AddModule(image_num, m, image.data()); } - aq_devices[i]->Terminate(); + dynamic_cast(aq_devices[i]).Terminate(); } REQUIRE_NOTHROW(state_machine.Stop()); @@ -324,19 +309,15 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); - aq_devices.emplace_back(test); + auto test = std::make_unique(i, 256); + aq_devices.Add(std::move(test)); } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - + ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver); @@ -369,9 +350,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { for (int i = 0; i < ndatastream; i++) { for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) - aq_devices[i]->AddModule(image_num, m, image.data()); + dynamic_cast(aq_devices[i]).AddModule(image_num, m, image.data()); } - aq_devices[i]->Terminate(); + dynamic_cast(aq_devices[i]).Terminate(); } REQUIRE_NOTHROW(state_machine.Stop()); @@ -423,19 +404,15 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); - aq_devices.emplace_back(test); + auto test = std::make_unique(i, 256); + aq_devices.Add(std::move(test)); } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - + ZMQImagePusher pusher(zmq_context, {"inproc://#0", "inproc://#1", "inproc://#2"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer_0(zmq_context, logger); JFJochWriterService writer_1(zmq_context, logger); @@ -468,9 +445,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") { for (int i = 0; i < ndatastream; i++) { for (int image_num = 1; image_num <= nimages; image_num++) { for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) - aq_devices[i]->AddModule(image_num, m, image.data()); + dynamic_cast(aq_devices[i]).AddModule(image_num, m, image.data()); } - aq_devices[i]->Terminate(); + dynamic_cast(aq_devices[i]).Terminate(); } REQUIRE_NOTHROW(state_machine.Stop()); @@ -518,24 +495,20 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); + auto test = std::make_unique(i, 256); test->EnableLogging(&logger); for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int frame = 1; frame <= nimages; frame++) test->AddModule(frame, m, image.data()); } - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - + ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver); @@ -596,25 +569,22 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") { std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); + auto test = std::make_unique(i, 256); for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) test->AddModule(image_num, m, image.data()); } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); ZMQPreviewPublisher preview(zmq_context, "inproc://#2"); @@ -703,25 +673,21 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]" std::vector image(RAW_MODULE_SIZE); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < ndatastream; i++) { - auto *test = new MockAcquisitionDevice(i, 256); + auto test = std::make_unique(i, 256); for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(i); m++) { for (int image_num = 1; image_num <= nimages; image_num++) test->AddModule(image_num, m, image.data()); } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); ZMQPreviewPublisher preview(zmq_context, "inproc://#2"); @@ -826,25 +792,22 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") { image_conv.data() + i * file_space.GetDimensions()[1] * file_space.GetDimensions()[2]); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; - auto *test = new MockAcquisitionDevice(0, 256); + auto test = std::make_unique(0, 256); for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(0); m++) { test->AddModule(1, m, (uint16_t *) (image_raw_geom.data() + m * RAW_MODULE_SIZE)); } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger); auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver); @@ -938,7 +901,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver] image_conv.data() + i * file_space.GetDimensions()[1] * file_space.GetDimensions()[2]); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; auto *test = new MockAcquisitionDevice(0, 512); @@ -952,13 +915,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver] } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); @@ -1059,9 +1019,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") { image_conv.data() + i * file_space.GetDimensions()[1] * file_space.GetDimensions()[2]); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; - auto *test = new MockAcquisitionDevice(0, 512); + auto test = std::make_unique(0, 512); for (int m = 0; m < state_machine.NotThreadSafe_Experiment().GetModulesNum(0); m++) { for (int image_num = 1; image_num <= nimages; image_num++) @@ -1073,16 +1033,13 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") { } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); - JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver(aq_devices, logger, pusher); JFJochWriterService writer(zmq_context, logger);; auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver); @@ -1174,7 +1131,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochRecei image_conv.data() + i * file_space.GetDimensions()[1] * file_space.GetDimensions()[2]); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; auto *test = new MockAcquisitionDevice(0, 512); @@ -1190,13 +1147,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochRecei } test->Terminate(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); ZMQContext zmq_context; - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); diff --git a/tests/JFJochReceiverIntegrationTest.cpp b/tests/JFJochReceiverIntegrationTest.cpp index 61007576..b20c31a7 100644 --- a/tests/JFJochReceiverIntegrationTest.cpp +++ b/tests/JFJochReceiverIntegrationTest.cpp @@ -17,12 +17,11 @@ TEST_CASE("JFJochReceiverTest_Raw", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true) .ImagesPerTrigger(100).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::NO_COMPRESSION); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); + std::unique_ptr test = std::make_unique(i,64); test->SetInternalGeneratorFrame(); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } Logger logger("JFJochReceiverTest_Raw"); @@ -49,12 +48,11 @@ TEST_CASE("JFJochReceiverTest_Conversion", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true) .ImagesPerTrigger(32).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::BSHUF_ZSTD); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); - test->EnableLogging(&logger); - aq_devices.emplace_back(test); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -78,12 +76,11 @@ TEST_CASE("JFJochReceiverTest_Conversion_U16", "[JFJochReceiver]") { .ImagesPerTrigger(32).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::BSHUF_ZSTD).FPGAOutputMode(JFJochProtoBuf::UINT16); REQUIRE(!x.IsPixelSigned()); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); - test->EnableLogging(&logger); - aq_devices.emplace_back(test); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -106,12 +103,11 @@ TEST_CASE("JFJochReceiverTest_Conversion_I32", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true) .ImagesPerTrigger(32).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::BSHUF_ZSTD).FPGAOutputMode(JFJochProtoBuf::INT32); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); - test->EnableLogging(&logger); - aq_devices.emplace_back(test); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -134,12 +130,11 @@ TEST_CASE("JFJochReceiverTest_Conversion_Summation2", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true) .ImagesPerTrigger(32).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::BSHUF_ZSTD).Summation(2); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); - test->EnableLogging(&logger); - aq_devices.emplace_back(test); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -165,12 +160,11 @@ TEST_CASE("JFJochReceiverTest_Conversion_StorageCell", "[JFJochReceiver]") { REQUIRE(x.GetImageNum() == 16); REQUIRE(x.GetStorageCellNumber() == 16); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); - test->EnableLogging(&logger); - aq_devices.emplace_back(test); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -207,14 +201,14 @@ TEST_CASE("JFJochReceiverTest_PedestalG1", "[JFJochReceiver]") { x.Mode(DetectorMode::PedestalG1).PedestalG0Frames(0) .PedestalG1Frames(nframes).NumTriggers(1).UseInternalPacketGenerator(false) .ImagesPerTrigger(0).PhotonEnergy_keV(12.4); - - std::vector> aq_devices; + + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); + std::unique_ptr test = std::make_unique(i,64); + test->SetInternalGeneratorFrame(); test->CreatePackets(x, 1, nframes, 0, pedestal_in.data(), false); test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } JFPedestalCalc pc(x); @@ -271,18 +265,15 @@ TEST_CASE("JFJochReceiverTest_PedestalG2_storage_cell", "[JFJochReceiver]") { REQUIRE(x.GetStorageCellNumber() == 2); REQUIRE(x.GetFrameNum() == nframes * 2); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, nframes * 2); + auto test = std::make_unique(i, nframes * 2); for (int j = 0; j < nframes; j++) { - test->CreatePackets(x, 2 * j + 1, 1, 0, - pedestal_in2.data() + j * RAW_MODULE_SIZE, false); - test->CreatePackets(x, 2 * j + 2, 1, 0, - pedestal_in.data() + j * RAW_MODULE_SIZE, false); + test->CreatePackets(x, 2 * j + 1, 1, 0, pedestal_in2.data() + j * RAW_MODULE_SIZE, false); + test->CreatePackets(x, 2 * j + 2, 1, 0, pedestal_in.data() + j * RAW_MODULE_SIZE, false); } test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } JFPedestalCalc pc(x); @@ -330,13 +321,12 @@ TEST_CASE("JFJochReceiverTest_PedestalG0", "[JFJochReceiver]") { .NumTriggers(1).UseInternalPacketGenerator(false) .ImagesPerTrigger(0).PhotonEnergy_keV(12.4); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, 64); + auto test = std::make_unique(i,64); test->CreatePackets(x, 1, nframes, 0, pedestal_in.data(), false); test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } JFPedestalCalc pc(x); @@ -378,9 +368,8 @@ TEST_CASE("JFJochReceiverTest_PedestalG0_StorageCell", "[JFJochReceiver]") { .NumTriggers(1).UseInternalPacketGenerator(false) .ImagesPerTrigger(0).PhotonEnergy_keV(12.4); - std::vector> aq_devices; - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(0, 64); + AcquisitionDeviceGroup aq_devices; + auto test = std::make_unique(0, 64); for (int i = 0; i < nframes; i++) { test->CreatePackets(x, i*4+1, 1, 0, pedestal_in_0.data(), false); test->CreatePackets(x, i*4+2, 1, 0, pedestal_in_1.data(), false); @@ -388,7 +377,7 @@ TEST_CASE("JFJochReceiverTest_PedestalG0_StorageCell", "[JFJochReceiver]") { test->CreatePackets(x, i*4+4, 1, 0, pedestal_in_3.data(), false); } test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); Logger logger("JFJochReceiverTest_PedestalG0_StorageCell"); JFJochReceiverOutput output; @@ -422,12 +411,11 @@ TEST_CASE("JFJochReceiverTest_PedestalG1_NoFrames", "[JFJochReceiver]") { x.PedestalG0Frames(0).PedestalG1Frames(256).NumTriggers(1) .UseInternalPacketGenerator(false).ImagesPerTrigger(0).PhotonEnergy_keV(12.4); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - HLSSimulatedDevice *test; - test = new HLSSimulatedDevice(i, nframes + 8); + auto test = std::make_unique(i, nframes + 8); test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } Logger logger("JFJochReceiverTest_PedestalG1_NoFrames"); @@ -453,9 +441,9 @@ TEST_CASE("JFJochReceiverTest_PacketLost_Raw", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1) .UseInternalPacketGenerator(false).ImagesPerTrigger(4).PhotonEnergy_keV(12.4); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - auto test = new HLSSimulatedDevice(i, 64); + auto test = std::make_unique(i, 64); test->CreatePackets(x, 1, 1, 0, frame_in.data(), false); test->CreatePackets(x, 1, 1, 1, frame_in.data(), false); test->CreatePackets(x, 2, 1, 0, frame_in.data(), false); @@ -468,22 +456,18 @@ TEST_CASE("JFJochReceiverTest_PacketLost_Raw", "[JFJochReceiver]") { test->CreatePackets(x, 4, 1, 1, frame_in.data(), false); test->CreateFinalPacket(x); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } Logger logger("JFJochReceiverTest_PacketLost_Raw"); JFCalibration calib(x); - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - TestImagePusher pusher(x.GetImageNum() - 1); - auto receiver_out = RunJFJochReceiverTest(tmp_devices, pusher, x, logger, calib, nthreads, false); + auto receiver_out = RunJFJochReceiverTest(aq_devices, pusher, x, logger, calib, nthreads, false); const auto image = pusher.GetImage(); - REQUIRE(aq_devices[0]->GetBytesReceived() == (8*128-1) * 8192UL); + REQUIRE(aq_devices[0].GetBytesReceived() == (8*128-1) * 8192UL); REQUIRE(image.size() == 2 * RAW_MODULE_SIZE * sizeof(uint16_t)); REQUIRE(pusher.GetCounter() == x.GetImageNum()); @@ -512,26 +496,22 @@ TEST_CASE("JFJochReceiverTest_Cancel", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1) .UseInternalPacketGenerator(false).ImagesPerTrigger(4).PhotonEnergy_keV(12.4); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - auto test = new HLSSimulatedDevice(i, 64); + auto test = std::make_unique(i, 64); test->CreatePackets(x, 1, 1, 0, frame_in.data(), false); test->CreatePackets(x, 1, 1, 1, frame_in.data(), false); test->CreatePackets(x, 2, 1, 0, frame_in.data(), false); test->CreatePackets(x, 2, 1, 1, frame_in.data(), false); test->CreatePackets(x, 3, 1, 0, frame_in.data(), false); test->CreatePackets(x, 3, 1, 1, frame_in.data(), false); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } Logger logger("JFJochReceiverTest_Cancelw"); JFCalibration calib(x); - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); - TestImagePusher pusher(x.GetImageNum() - 1); - auto receiver_out = RunJFJochReceiverTest(tmp_devices, pusher, x, logger, calib, nthreads, true); + auto receiver_out = RunJFJochReceiverTest(aq_devices, pusher, x, logger, calib, nthreads, true); REQUIRE(receiver_out.cancelled); } \ No newline at end of file diff --git a/tests/MockAcquisitionDeviceTest.cpp b/tests/MockAcquisitionDeviceTest.cpp index 923504cb..fb16b7be 100644 --- a/tests/MockAcquisitionDeviceTest.cpp +++ b/tests/MockAcquisitionDeviceTest.cpp @@ -50,15 +50,15 @@ TEST_CASE("JFJochReceiverTest_Raw_MockAcquisitionDevice", "[JFJochReceiver]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true) .ImagesPerTrigger(100).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::NO_COMPRESSION); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { for (auto &j: test_frame) j = dist(g1); - auto *test = new MockAcquisitionDevice(i, 64); + auto test = std::make_unique(i, 64); test->EnableLogging(&logger); test->SetCustomInternalGeneratorFrame(test_frame); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; @@ -84,12 +84,11 @@ TEST_CASE("JFJochReceiverTest_Conversion_MockAcquisitionDevice", "[JFJochReceive x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(true).DataStreams(x.GetModulesNum()) .ImagesPerTrigger(32).DataFileCount(16).PhotonEnergy_keV(12.4).Compression(JFJochProtoBuf::BSHUF_ZSTD); - std::vector> aq_devices; + AcquisitionDeviceGroup aq_devices; for (int i = 0; i < x.GetDataStreamsNum(); i++) { - AcquisitionDevice *test; - test = new MockAcquisitionDevice(i, 64); + auto test = std::make_unique(i, 64); test->EnableLogging(&logger); - aq_devices.emplace_back(test); + aq_devices.Add(std::move(test)); } JFJochReceiverOutput output; diff --git a/tests/StreamWriterTest.cpp b/tests/StreamWriterTest.cpp index eaeb5525..fa1fa3bf 100644 --- a/tests/StreamWriterTest.cpp +++ b/tests/StreamWriterTest.cpp @@ -28,18 +28,12 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { *receiver_input.mutable_jungfraujoch_settings() = x; JFModuleGainCalibration gain; - std::vector> aq_devices; - for (int i = 0; i < x.GetDataStreamsNum(); i++) { - auto test = new HLSSimulatedDevice(i, 64); - aq_devices.emplace_back(test); - } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); + AcquisitionDeviceGroup aq_devices; + for (int i = 0; i < x.GetDataStreamsNum(); i++) + aq_devices.AddHLSDevice(64); ZMQImagePusher pusher (context, {zmq_addr}); - JFJochReceiverService fpga_receiver_service(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher); ; JFJochProtoBuf::ReceiverOutput receiver_output; @@ -95,18 +89,12 @@ TEST_CASE("JFJochWriterServiceTest_ZMQ","[JFJochWriter]") { JFModuleGainCalibration empty_gain; - std::vector> aq_devices; - for (int i = 0; i < x.GetDataStreamsNum(); i++) { - auto test = new HLSSimulatedDevice(i, 64); - aq_devices.emplace_back(test); - } - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); + AcquisitionDeviceGroup aq_devices; + for (int i = 0; i < x.GetDataStreamsNum(); i++) + aq_devices.AddHLSDevice(64); ZMQImagePusher pusher (context, {zmq_addr}); - JFJochReceiverService fpga_receiver_service(tmp_devices, logger, pusher); + JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher); JFJochProtoBuf::WriterInput writer_input; writer_input.set_zmq_receiver_address(zmq_addr); diff --git a/tests/gRPCServerTest.cpp b/tests/gRPCServerTest.cpp index 5f710fb1..a7d8c51a 100644 --- a/tests/gRPCServerTest.cpp +++ b/tests/gRPCServerTest.cpp @@ -15,19 +15,14 @@ TEST_CASE("JFJochReceiver_gRPC_server", "[gRPC]") { DiffractionExperiment x(DetectorGeometry(4, 2)); - std::vector> aq_devices; - AcquisitionDevice *test = new HLSSimulatedDevice(0, 64); - aq_devices.emplace_back(test); - - std::vector tmp_devices; - for (const auto &i: aq_devices) - tmp_devices.emplace_back(i.get()); + AcquisitionDeviceGroup aq_devices; + aq_devices.AddHLSDevice(64); ZMQContext zmq_context; Logger logger("receiver"); ZMQImagePusher pusher(zmq_context, {"inproc://1"}); - JFJochReceiverService service(tmp_devices, logger, pusher); + JFJochReceiverService service(aq_devices, logger, pusher); auto server = gRPCServer("unix:receiver_test", service); {