From 8e0edab0ee8d5d127174617fb0dce7908e610a95 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Tue, 19 Sep 2023 12:53:59 +0200 Subject: [PATCH] AcquisitionDevice: Count completed descriptors --- fpga/hls/datamover_model.h | 6 ++++++ jungfrau/ProcessJFPacket.cpp | 9 +++++++-- jungfrau/ProcessJFPacket.h | 2 ++ receiver/AcquisitionDevice.cpp | 6 ++++++ receiver/AcquisitionDevice.h | 3 ++- receiver/HLSSimulatedDevice.cpp | 6 +++++- receiver/HLSSimulatedDevice.h | 1 + receiver/LinuxSocketDevice.cpp | 7 ++++++- receiver/LinuxSocketDevice.h | 3 +++ receiver/MlxRawEthDevice.cpp | 5 ++++- receiver/MlxRawEthDevice.h | 3 +++ receiver/MockAcquisitionDevice.cpp | 12 +++++++++--- receiver/MockAcquisitionDevice.h | 3 ++- receiver/PCIExpressDevice.cpp | 9 ++++++++- receiver/PCIExpressDevice.h | 1 + tests/FPGAIntegrationTest.cpp | 1 + 16 files changed, 66 insertions(+), 11 deletions(-) diff --git a/fpga/hls/datamover_model.h b/fpga/hls/datamover_model.h index f01938e5..a3bf836d 100644 --- a/fpga/hls/datamover_model.h +++ b/fpga/hls/datamover_model.h @@ -17,6 +17,7 @@ template class Datamover { std::thread datamover_thread; size_t bytes_transferred; + std::atomic descriptors_done = 0; void Move() { axis_datamover_ctrl command = control.read(); @@ -55,6 +56,7 @@ template class Datamover { bytes_transferred += N/8; } } + descriptors_done++; } public: void Run() { @@ -79,6 +81,10 @@ public: return control.empty(); } + uint32_t GetCompletedDescriptors() const { + return descriptors_done; + } + hls::stream& GetCtrlStream() { return control; } hls::stream >& GetDataStream() { return data; } ~Datamover() { Stop(); } diff --git a/jungfrau/ProcessJFPacket.cpp b/jungfrau/ProcessJFPacket.cpp index 00c41b2e..9fcd9dfd 100644 --- a/jungfrau/ProcessJFPacket.cpp +++ b/jungfrau/ProcessJFPacket.cpp @@ -42,9 +42,10 @@ void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram) { { std::unique_lock ul(m[module_info_location]); if (module_info[module_info_location].c.frame_number != frame_number) { - if (module_info[module_info_location].c.frame_number != UINT64_MAX) + if (module_info[module_info_location].c.frame_number != UINT64_MAX) { + completed_descriptors++; c_fifo.Put(module_info[module_info_location].c); - + } auto wr = wr_fifo.GetBlocking(); module_info[module_info_location].c.type = Completion::Type::Image; @@ -74,3 +75,7 @@ void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram) { uint64_t ProcessJFPacket::GetCounter() { return packet_counter; } + +uint32_t ProcessJFPacket::GetCompletedDescriptors() const { + return completed_descriptors; +} diff --git a/jungfrau/ProcessJFPacket.h b/jungfrau/ProcessJFPacket.h index 9fb316c2..7a287afd 100644 --- a/jungfrau/ProcessJFPacket.h +++ b/jungfrau/ProcessJFPacket.h @@ -18,6 +18,7 @@ struct ModuleInfo { }; class ProcessJFPacket { + std::atomic completed_descriptors; std::vector m; ThreadSafeFIFO &c_fifo; ThreadSafeFIFO &wr_fifo; @@ -28,6 +29,7 @@ public: ~ProcessJFPacket(); void ProcessPacket(jf_udp_payload *datagram); uint64_t GetCounter(); + uint32_t GetCompletedDescriptors() const; }; diff --git a/receiver/AcquisitionDevice.cpp b/receiver/AcquisitionDevice.cpp index 95c1217e..989a2c59 100644 --- a/receiver/AcquisitionDevice.cpp +++ b/receiver/AcquisitionDevice.cpp @@ -75,6 +75,8 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) { if (c.type != Completion::Type::Start) throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in work completions"); + work_completion_count = 0; + StartSendingWorkRequests(); start_time = std::chrono::system_clock::now(); @@ -87,6 +89,10 @@ void AcquisitionDevice::WaitForActionComplete() { auto c = work_completion_queue.GetBlocking(); while (c.type != Completion::Type::End) { + work_completion_count++; + while (work_completion_count > GetCompletedDescriptors()) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (c.frame_number >= expected_frames) { Cancel(); // this frame is not of any interest, therefore its location can be immediately released diff --git a/receiver/AcquisitionDevice.h b/receiver/AcquisitionDevice.h index 4a41d078..309167ea 100644 --- a/receiver/AcquisitionDevice.h +++ b/receiver/AcquisitionDevice.h @@ -45,6 +45,7 @@ protected: uint32_t max_modules = 1; uint64_t mac_addr; uint32_t ipv4_addr; + std::atomic work_completion_count; explicit AcquisitionDevice(uint16_t data_stream); @@ -85,7 +86,7 @@ public: virtual std::string GetMACAddress() const; virtual uint16_t GetUDPPort() const; virtual int32_t GetNUMANode() const; - + virtual uint32_t GetCompletedDescriptors() const = 0; virtual std::vector GetInternalGeneratorFrame() const { return {}; } diff --git a/receiver/HLSSimulatedDevice.cpp b/receiver/HLSSimulatedDevice.cpp index 51b3259e..c062aeb6 100644 --- a/receiver/HLSSimulatedDevice.cpp +++ b/receiver/HLSSimulatedDevice.cpp @@ -30,7 +30,7 @@ uint16_t checksum(const uint16_t *addr, size_t count) { HLSSimulatedDevice::HLSSimulatedDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules, int16_t numa_node) : FPGAAcquisitionDevice(data_stream), datamover_in(Direction::Input), - datamover_out(Direction::Output, nullptr, 256), + datamover_out(Direction::Output, nullptr), datamover_out_hbm_0(Direction::Output, (char *) hbm.data(), 16), datamover_out_hbm_1(Direction::Output, (char *) hbm.data(), 16), datamover_in_hbm_0(Direction::Input, (char *) hbm.data()), @@ -437,4 +437,8 @@ void HLSSimulatedDevice::HW_LoadCalibration(uint32_t modules, uint32_t storage_c if (!datamover_in.GetDataStream().empty()) throw std::runtime_error("Datamover queue is not empty"); +} + +uint32_t HLSSimulatedDevice::GetCompletedDescriptors() const { + return datamover_out.GetCompletedDescriptors(); } \ No newline at end of file diff --git a/receiver/HLSSimulatedDevice.h b/receiver/HLSSimulatedDevice.h index e2b8b315..9cb83cc8 100644 --- a/receiver/HLSSimulatedDevice.h +++ b/receiver/HLSSimulatedDevice.h @@ -68,6 +68,7 @@ public: void CreateFinalPacket(const DiffractionExperiment& experiment); AXI_STREAM &OutputStream(); void Cancel() override; + uint32_t GetCompletedDescriptors() const override; }; diff --git a/receiver/LinuxSocketDevice.cpp b/receiver/LinuxSocketDevice.cpp index 033c57fe..09b4f537 100644 --- a/receiver/LinuxSocketDevice.cpp +++ b/receiver/LinuxSocketDevice.cpp @@ -35,6 +35,7 @@ void LinuxSocketDevice::MeasureThread(int fd) { auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0); if (count == sizeof(jf_udp_payload)) { process.ProcessPacket(&jf); + completed_descriptors = process.GetCompletedDescriptors(); } else if ((count == -1) && (errno != EAGAIN) && (errno != EWOULDBLOCK)) throw JFJochException(JFJochExceptionCategory::UDPError, "Error in UDP receiving"); } @@ -79,8 +80,8 @@ void LinuxSocketDevice::Start(const DiffractionExperiment& experiment) { if (bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) != 0) throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot bind to UDP port"); + completed_descriptors = 0; cancel = false; - measure = std::async(std::launch::async, &LinuxSocketDevice::MeasureThread, this, fd); } @@ -134,3 +135,7 @@ void LinuxSocketDevice::FindMACAddress() { freeifaddrs(ifaddr); } + +uint32_t LinuxSocketDevice::GetCompletedDescriptors() const { + return completed_descriptors; +} \ No newline at end of file diff --git a/receiver/LinuxSocketDevice.h b/receiver/LinuxSocketDevice.h index 02ab1516..80d21e4e 100644 --- a/receiver/LinuxSocketDevice.h +++ b/receiver/LinuxSocketDevice.h @@ -8,6 +8,8 @@ #include "AcquisitionDevice.h" class LinuxSocketDevice : public AcquisitionDevice { + std::atomic completed_descriptors = 0; + int32_t rcv_buf_size; uint16_t udp_port; const int16_t numa_node; @@ -30,6 +32,7 @@ public: int32_t GetNUMANode() const override; uint16_t GetUDPPort() const override; void Cancel() override; + uint32_t GetCompletedDescriptors() const override; }; #endif //JUNGFRAUJOCH_LINUXSOCKETDEVICE_H diff --git a/receiver/MlxRawEthDevice.cpp b/receiver/MlxRawEthDevice.cpp index 835f57c1..8dc087bc 100644 --- a/receiver/MlxRawEthDevice.cpp +++ b/receiver/MlxRawEthDevice.cpp @@ -76,7 +76,6 @@ void MlxRawEthDevice::Start(const DiffractionExperiment& experiment) { if (experiment.GetConversionOnFPGA()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Conversion on CPU flag has to be enabled for Raw Ethernet device"); - cancel = false; measure = std::async(std::launch::async, &MlxRawEthDevice::MeasureThread, this); } @@ -111,6 +110,7 @@ void MlxRawEthDevice::PollCQ(IBRegBuffer &buffer, IBQueuePair &qp, IBCompletionQ if ((i < BUFFER_COUNT) && (size == sizeof(jf_raw_packet))) { auto ptr = (jf_raw_packet *) buffer.GetLocation(i); process.ProcessPacket(&ptr->jf); + completed_descriptors = process.GetCompletedDescriptors(); qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); } } else @@ -168,5 +168,8 @@ void MlxRawEthDevice::SendARP(IBRegBuffer &buffer, IBQueuePair &qp) { } } +uint32_t MlxRawEthDevice::GetCompletedDescriptors() const { + return completed_descriptors; +} #endif //JFJOCH_USE_IBVERBS \ No newline at end of file diff --git a/receiver/MlxRawEthDevice.h b/receiver/MlxRawEthDevice.h index 8de9d6a6..093a593a 100644 --- a/receiver/MlxRawEthDevice.h +++ b/receiver/MlxRawEthDevice.h @@ -16,6 +16,8 @@ class MlxRawEthDevice : public AcquisitionDevice { constexpr const static size_t BUFFER_SIZE = 9000; constexpr const static size_t BUFFER_COUNT = 4096; + std::atomic completed_descriptors = 0; + std::mutex m; IBContext context; @@ -42,6 +44,7 @@ public: void SetMACAddress(uint64_t mac_addr_network_order); void SetIPv4Address(uint32_t ipv4_addr_network_order); void Cancel() override; + uint32_t GetCompletedDescriptors() const override; }; #endif //JUNGFRAUJOCH_RAWETHDEVICE_H diff --git a/receiver/MockAcquisitionDevice.cpp b/receiver/MockAcquisitionDevice.cpp index 79cd7411..ab2e4f0b 100644 --- a/receiver/MockAcquisitionDevice.cpp +++ b/receiver/MockAcquisitionDevice.cpp @@ -8,6 +8,7 @@ void MockAcquisitionDevice::Start(const DiffractionExperiment& experiment) { idle = false; cancel = false; + completed_descriptors = 0; if (experiment.IsUsingInternalPacketGen()) { if (experiment.GetConversionOnFPGA() && (experiment.GetDetectorMode() == DetectorMode::Conversion)) { @@ -79,9 +80,10 @@ void MockAcquisitionDevice::MeasureThread() { Completion c{}; if (mock_completions.Get(c)) { - if (c.type == Completion::Type::Image) + if (c.type == Completion::Type::Image) { work_completion_queue.Put(c); - else + completed_descriptors++; + } else cancel = true; } else std::this_thread::sleep_for(std::chrono::microseconds(100)); @@ -111,7 +113,7 @@ void MockAcquisitionDevice::InternalPacketGeneratorThread(uint32_t nmodules, uin c.packet_mask[1] = UINT64_MAX; c.packet_count = 128; work_completion_queue.Put(c); - + completed_descriptors++; curr_module++; if (curr_module == nmodules) { curr_module = 0; @@ -168,3 +170,7 @@ void MockAcquisitionDevice::InitializeCalibration(const DiffractionExperiment &e std::vector MockAcquisitionDevice::GetInternalGeneratorFrame() const { return internal_pkt_gen_frame; } + +uint32_t MockAcquisitionDevice::GetCompletedDescriptors() const { + return completed_descriptors; +} diff --git a/receiver/MockAcquisitionDevice.h b/receiver/MockAcquisitionDevice.h index 045fe81a..3f60e8d5 100644 --- a/receiver/MockAcquisitionDevice.h +++ b/receiver/MockAcquisitionDevice.h @@ -10,7 +10,7 @@ class MockAcquisitionDevice : public AcquisitionDevice { uint32_t current_handle = 0; uint32_t max_handle = 0; - + std::atomic completed_descriptors = 0; bool idle = true; std::future measure; volatile bool cancel = false; @@ -38,6 +38,7 @@ public: void SetCustomInternalGeneratorFrame(const std::vector &v); std::vector GetInternalGeneratorFrame() const override; void InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) override; + uint32_t GetCompletedDescriptors() const override; }; #endif //JUNGFRAUJOCH_MOCKACQUISITIONDEVICE_H diff --git a/receiver/PCIExpressDevice.cpp b/receiver/PCIExpressDevice.cpp index 90c0925d..df708470 100644 --- a/receiver/PCIExpressDevice.cpp +++ b/receiver/PCIExpressDevice.cpp @@ -236,4 +236,11 @@ void PCIExpressDevice::HW_LoadCalibration(uint32_t in_modules, uint32_t in_stora if (ioctl(fd, IOCTL_JFJOCH_LOAD_CALIB, &config) != 0) throw JFJochException(JFJochExceptionCategory::PCIeError, "Failed writing config", errno); -} \ No newline at end of file +} + +uint32_t PCIExpressDevice::GetCompletedDescriptors() const { + uint32_t ret = 0; + if (ioctl(fd, IOCTL_JFJOCH_C2H_DMA_DESC, &ret) != 0) + throw JFJochException(JFJochExceptionCategory::PCIeError, "Failed geting C2H completed descriptor count", errno); + return ret; +} diff --git a/receiver/PCIExpressDevice.h b/receiver/PCIExpressDevice.h index 511f28cc..f7f956da 100644 --- a/receiver/PCIExpressDevice.h +++ b/receiver/PCIExpressDevice.h @@ -42,6 +42,7 @@ public: std::string GetMACAddress() const override; std::string GetIPv4Address() const override; + uint32_t GetCompletedDescriptors() const override; }; #endif //JUNGFRAUJOCH_PCIEXPRESSDEVICE_H diff --git a/tests/FPGAIntegrationTest.cpp b/tests/FPGAIntegrationTest.cpp index eb0d135e..fe740c28 100644 --- a/tests/FPGAIntegrationTest.cpp +++ b/tests/FPGAIntegrationTest.cpp @@ -36,6 +36,7 @@ TEST_CASE("HLS_C_Simulation_internal_packet_generator", "[FPGA][Full]") { REQUIRE(imageBuf[i] == i % 65536); } } + REQUIRE(test.GetCompletedDescriptors() == (4 + DELAY_FRAMES_STOP_AND_QUIT - 1) * nmodules); } TEST_CASE("HLS_C_Simulation_internal_packet_generator_skip_packets", "[FPGA][Full]") {