From bf2a23ef7e997eaad4e2da1b978994ea45cb5b07 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Tue, 25 Apr 2023 15:58:07 +0200 Subject: [PATCH] AcquisitionDevice: Completion queue is handled by particular implementation of the device --- receiver/host/AcquisitionDevice.cpp | 20 +++++------------- receiver/host/AcquisitionDevice.h | 11 ++++------ receiver/host/FPGAAcquisitionDevice.cpp | 28 ++++++++++++++++++------- receiver/host/FPGAAcquisitionDevice.h | 10 ++++++++- receiver/host/HLSSimulatedDevice.cpp | 7 ++++++- receiver/host/HLSSimulatedDevice.h | 3 ++- receiver/host/LinuxSocketDevice.cpp | 10 +++------ receiver/host/LinuxSocketDevice.h | 3 --- receiver/host/MlxRawEthDevice.cpp | 10 +++------ receiver/host/MlxRawEthDevice.h | 2 -- receiver/host/MockAcquisitionDevice.cpp | 17 +++++---------- receiver/host/MockAcquisitionDevice.h | 5 +---- receiver/host/PCIExpressDevice.cpp | 4 ++-- receiver/host/PCIExpressDevice.h | 4 ++-- tests/MockAcquisitionDeviceTest.cpp | 1 + 15 files changed, 64 insertions(+), 71 deletions(-) diff --git a/receiver/host/AcquisitionDevice.cpp b/receiver/host/AcquisitionDevice.cpp index 39c5b27d..1d3ee51b 100644 --- a/receiver/host/AcquisitionDevice.cpp +++ b/receiver/host/AcquisitionDevice.cpp @@ -118,22 +118,20 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) { throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch between expected and actual values of configuration registers (#modules)"); - // Ensure internal queues are empty + // Ensure internal WR queue is empty work_request_queue.Clear(); - work_completion_queue.Clear(); HW_StartAction(); send_work_request_future = std::async(std::launch::async, &AcquisitionDevice::SendWorkRequestThread, this); - read_work_completion_future = std::async(std::launch::async, &AcquisitionDevice::ReadWorkCompletionThread, this); + + for (uint32_t i = 0; i < buffer_device.size(); i++) + SendWorkRequest(i); auto c = work_completion_queue.GetBlocking(); if (c.type != Completion::Type::Start) throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in completion queue"); - for (uint32_t i = 0; i < buffer_device.size(); i++) - SendWorkRequest(i); - start_time = std::chrono::system_clock::now(); } @@ -162,14 +160,6 @@ const std::vector &AcquisitionDevice::GetInternalGeneratorFrame() cons return internal_pkt_gen_frame; } -void AcquisitionDevice::ReadWorkCompletionThread() { - Completion c; - do { - c = ReadCompletion(); - work_completion_queue.PutBlocking(c); - } while (c.type != Completion::Type::End); -} - void AcquisitionDevice::WaitForActionComplete() { auto c = work_completion_queue.GetBlocking(); @@ -193,7 +183,7 @@ void AcquisitionDevice::WaitForActionComplete() { c = work_completion_queue.GetBlocking(); } bytes_received = c.frame_number * 8192LU; - read_work_completion_future.get(); + counters.SetAcquisitionFinished(); end_time = std::chrono::system_clock::now(); diff --git a/receiver/host/AcquisitionDevice.h b/receiver/host/AcquisitionDevice.h index cf0b9355..946d8f62 100644 --- a/receiver/host/AcquisitionDevice.h +++ b/receiver/host/AcquisitionDevice.h @@ -38,10 +38,6 @@ class AcquisitionDevice { int64_t expected_frames; AcquisitionOnlineCounters counters; - ThreadSafeFIFO work_completion_queue; - std::future read_work_completion_future; - void ReadWorkCompletionThread(); - ThreadSafeSet work_request_queue; std::future send_work_request_future; @@ -50,15 +46,12 @@ class AcquisitionDevice { void SendWorkRequestThread(); void EndWorkRequestAndSignalQueues(); - virtual Completion ReadCompletion() = 0; - virtual void HW_WriteActionRegister(const ActionConfig *job) = 0; virtual void HW_ReadActionRegister(ActionConfig *job) = 0; virtual void HW_StartAction() = 0; virtual bool HW_IsIdle() const = 0; virtual void HW_SetCancelDataCollectionBit() = 0; - virtual bool HW_SendWorkRequest(uint32_t handle) = 0; virtual void HW_GetStatus(ActionStatus *status) const = 0; virtual void HW_GetEnvParams(ActionEnvParams *status) const { memset(status, 0, sizeof(ActionEnvParams)); @@ -68,6 +61,8 @@ class AcquisitionDevice { virtual void HW_EndAction() {}; // do clean-up after action is done virtual void CopyInternalPacketGenFrameToDeviceBuffer(); protected: + ThreadSafeFIFO work_completion_queue; + std::vector buffer_device; std::vector internal_pkt_gen_frame; @@ -79,6 +74,8 @@ protected: void UnmapBuffers(); void MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node); + + virtual bool HW_SendWorkRequest(uint32_t handle) = 0; public: static constexpr const uint64_t HandleNotValid = UINT64_MAX; diff --git a/receiver/host/FPGAAcquisitionDevice.cpp b/receiver/host/FPGAAcquisitionDevice.cpp index 061ba479..e838eb6a 100644 --- a/receiver/host/FPGAAcquisitionDevice.cpp +++ b/receiver/host/FPGAAcquisitionDevice.cpp @@ -3,11 +3,25 @@ #include "FPGAAcquisitionDevice.h" -Completion FPGAAcquisitionDevice::ReadCompletion() { - uint32_t values[12]; - while (!HW_ReadMailbox(values)) { - // The receiving FIFO level is less than or equal to the RIT threshold - std::this_thread::sleep_for(std::chrono::microseconds(10)); - } - return parse_hw_completion(values); +void FPGAAcquisitionDevice::HW_StartAction() { + FPGA_StartAction(); + read_work_completion_future = std::async(std::launch::async, &FPGAAcquisitionDevice::ReadWorkCompletionThread, this); } + +void FPGAAcquisitionDevice::HW_EndAction() { + read_work_completion_future.get(); + FPGA_EndAction(); +} + +void FPGAAcquisitionDevice::ReadWorkCompletionThread() { + uint32_t values[12]; + + Completion c; + do { + while (!HW_ReadMailbox(values)) + std::this_thread::sleep_for(std::chrono::microseconds(10)); + + c = parse_hw_completion(values); + work_completion_queue.PutBlocking(c); + } while (c.type != Completion::Type::End); +} \ No newline at end of file diff --git a/receiver/host/FPGAAcquisitionDevice.h b/receiver/host/FPGAAcquisitionDevice.h index 61625cd6..ee690741 100644 --- a/receiver/host/FPGAAcquisitionDevice.h +++ b/receiver/host/FPGAAcquisitionDevice.h @@ -8,7 +8,15 @@ class FPGAAcquisitionDevice : public AcquisitionDevice { virtual bool HW_ReadMailbox(uint32_t values[12]) = 0; - Completion ReadCompletion() override; + + virtual void FPGA_StartAction() = 0; + virtual void FPGA_EndAction() = 0; + + void HW_StartAction() final; + void HW_EndAction() final; + + std::future read_work_completion_future; + void ReadWorkCompletionThread(); protected: explicit FPGAAcquisitionDevice(uint16_t data_stream) : AcquisitionDevice(data_stream) {} }; diff --git a/receiver/host/HLSSimulatedDevice.cpp b/receiver/host/HLSSimulatedDevice.cpp index 5f561798..68b39d29 100644 --- a/receiver/host/HLSSimulatedDevice.cpp +++ b/receiver/host/HLSSimulatedDevice.cpp @@ -135,7 +135,7 @@ void HLSSimulatedDevice::HW_WriteActionRegister(const ActionConfig *job) { memcpy(&cfg, job, sizeof(ActionConfig)); } -void HLSSimulatedDevice::HW_StartAction() { +void HLSSimulatedDevice::FPGA_StartAction() { if (action_thread.joinable()) action_thread.join(); @@ -145,6 +145,11 @@ void HLSSimulatedDevice::HW_StartAction() { action_thread = std::thread(&HLSSimulatedDevice::HLSMainThread, this ); } +void HLSSimulatedDevice::FPGA_EndAction() { + if (action_thread.joinable()) + action_thread.join(); +} + HLSSimulatedDevice::~HLSSimulatedDevice() { if (action_thread.joinable()) action_thread.join(); diff --git a/receiver/host/HLSSimulatedDevice.h b/receiver/host/HLSSimulatedDevice.h index 7c5958e9..3bbe69b6 100644 --- a/receiver/host/HLSSimulatedDevice.h +++ b/receiver/host/HLSSimulatedDevice.h @@ -39,7 +39,8 @@ class HLSSimulatedDevice : public FPGAAcquisitionDevice { void HW_ReadActionRegister(ActionConfig *job) override; void HW_WriteActionRegister(const ActionConfig *job) override; - void HW_StartAction() override; + void FPGA_StartAction() override; + void FPGA_EndAction() override; bool HW_IsIdle() const override; bool HW_ReadMailbox(uint32_t values[12]); void HW_SetCancelDataCollectionBit() override; diff --git a/receiver/host/LinuxSocketDevice.cpp b/receiver/host/LinuxSocketDevice.cpp index c694893a..b22de9f5 100644 --- a/receiver/host/LinuxSocketDevice.cpp +++ b/receiver/host/LinuxSocketDevice.cpp @@ -28,12 +28,12 @@ void LinuxSocketDevice::MeasureThread(int fd) { uint64_t packet_count = 0; - completion_queue.Put(Completion{ + work_completion_queue.Put(Completion{ .type = Completion::Type::Start }); try { - ProcessJFPacket process(completion_queue, wr_queue, max_modules); + ProcessJFPacket process(work_completion_queue, wr_queue, max_modules); while (!cancel) { auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0); @@ -49,7 +49,7 @@ void LinuxSocketDevice::MeasureThread(int fd) { logger->ErrorException(e); } // End message should be sent always - completion_queue.Put(Completion{ + work_completion_queue.Put(Completion{ .type = Completion::Type::End, .frame_number = packet_count }); @@ -57,10 +57,6 @@ void LinuxSocketDevice::MeasureThread(int fd) { idle = true; } -Completion LinuxSocketDevice::ReadCompletion() { - return completion_queue.GetBlocking(); -} - void LinuxSocketDevice::HW_WriteActionRegister(const ActionConfig *job) { memcpy(&cfg, job, sizeof(ActionConfig)); } diff --git a/receiver/host/LinuxSocketDevice.h b/receiver/host/LinuxSocketDevice.h index 57eedcde..69a9c415 100644 --- a/receiver/host/LinuxSocketDevice.h +++ b/receiver/host/LinuxSocketDevice.h @@ -9,10 +9,8 @@ #include "AcquisitionDevice.h" class LinuxSocketDevice : public AcquisitionDevice { - ThreadSafeFIFO completion_queue; ThreadSafeFIFO wr_queue; - int32_t rcv_buf_size; uint64_t mac_addr; @@ -26,7 +24,6 @@ class LinuxSocketDevice : public AcquisitionDevice { volatile bool cancel = false; volatile bool idle = true; - Completion ReadCompletion() override; void HW_WriteActionRegister(const ActionConfig *job) override; void HW_ReadActionRegister(ActionConfig *job) override; void HW_StartAction() override; diff --git a/receiver/host/MlxRawEthDevice.cpp b/receiver/host/MlxRawEthDevice.cpp index 1c344bd1..8eb6b21f 100644 --- a/receiver/host/MlxRawEthDevice.cpp +++ b/receiver/host/MlxRawEthDevice.cpp @@ -29,10 +29,6 @@ int32_t MlxRawEthDevice::GetNUMANode() const { return numa_node; } -Completion MlxRawEthDevice::ReadCompletion() { - return completion_queue.GetBlocking(); -} - void MlxRawEthDevice::HW_WriteActionRegister(const ActionConfig *job) { memcpy(&cfg, job, sizeof(ActionConfig)); } @@ -45,7 +41,7 @@ void MlxRawEthDevice::MeasureThread() { uint64_t packet_count = 0; - completion_queue.Put(Completion{ + work_completion_queue.Put(Completion{ .type = Completion::Type::Start }); @@ -54,7 +50,7 @@ void MlxRawEthDevice::MeasureThread() { IBCompletionQueue cq(context, BUFFER_COUNT+2); IBQueuePair qp(pd, cq, 16, BUFFER_COUNT); IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node); - ProcessJFPacket process(completion_queue, wr_queue, cfg.nmodules); + ProcessJFPacket process(work_completion_queue, wr_queue, cfg.nmodules); qp.Init(); qp.ReadyToReceive(); @@ -86,7 +82,7 @@ void MlxRawEthDevice::MeasureThread() { logger->ErrorException(e); } - completion_queue.Put(Completion{ + work_completion_queue.Put(Completion{ .type = Completion::Type::End, .frame_number = packet_count }); diff --git a/receiver/host/MlxRawEthDevice.h b/receiver/host/MlxRawEthDevice.h index 2d40bcf7..15c15265 100644 --- a/receiver/host/MlxRawEthDevice.h +++ b/receiver/host/MlxRawEthDevice.h @@ -19,7 +19,6 @@ class MlxRawEthDevice : public AcquisitionDevice { std::mutex m; IBContext context; - ThreadSafeFIFO completion_queue; ThreadSafeFIFO wr_queue; uint64_t mac_addr; uint32_t ipv4_addr; @@ -39,7 +38,6 @@ class MlxRawEthDevice : public AcquisitionDevice { ProcessJFPacket &process); void MeasureThread(); - Completion ReadCompletion() override; void HW_WriteActionRegister(const ActionConfig *job) override; void HW_ReadActionRegister(ActionConfig *job) override; void HW_StartAction() override; diff --git a/receiver/host/MockAcquisitionDevice.cpp b/receiver/host/MockAcquisitionDevice.cpp index f1139f42..1f148d8f 100644 --- a/receiver/host/MockAcquisitionDevice.cpp +++ b/receiver/host/MockAcquisitionDevice.cpp @@ -15,10 +15,6 @@ void MockAcquisitionDevice::HW_WriteActionRegister(const ActionConfig *job) { void MockAcquisitionDevice::HW_StartAction() {} -Completion MockAcquisitionDevice::ReadCompletion() { - return mailbox_fifo.GetBlocking(); -} - void MockAcquisitionDevice::HW_SetCancelDataCollectionBit() { if (logger) logger->Info("MockAcquisitionDevice cancelling " + std::to_string(data_stream)); @@ -36,14 +32,10 @@ bool MockAcquisitionDevice::HW_IsIdle() const { MockAcquisitionDevice::MockAcquisitionDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules) : AcquisitionDevice(data_stream) { - max_modules = 16; - - MapBuffersStandard(in_frame_buffer_size_modules, - (3 + 3 * 16) * max_modules + frames_int_pkt_gen, -1); + MapBuffersStandard(in_frame_buffer_size_modules, 1, -1); max_handle = in_frame_buffer_size_modules; - - mailbox_fifo.Put(Completion{.type = Completion::Type::Start}); + work_completion_queue.Put(Completion{.type = Completion::Type::Start}); } void MockAcquisitionDevice::SendCompletion(uint32_t handle, uint16_t module_number, uint64_t frame_number) { @@ -55,7 +47,7 @@ void MockAcquisitionDevice::SendCompletion(uint32_t handle, uint16_t module_numb c.packet_mask[0] = UINT64_MAX; c.packet_mask[1] = UINT64_MAX; c.packet_count = 128; - mailbox_fifo.Put(c); + work_completion_queue.Put(c); } void MockAcquisitionDevice::AddModule(uint64_t frame_number, uint16_t module_number, const uint16_t *data) { @@ -74,10 +66,11 @@ void MockAcquisitionDevice::AddModule(uint64_t frame_number, uint16_t module_num } void MockAcquisitionDevice::Terminate() { - mailbox_fifo.Put(Completion{ + work_completion_queue.Put(Completion{ .type = Completion::Type::End, .frame_number = current_handle * 128 }); + work_completion_queue.Put(Completion{.type = Completion::Type::Start}); } uint64_t MockAcquisitionDevice::HW_GetMACAddress() const { diff --git a/receiver/host/MockAcquisitionDevice.h b/receiver/host/MockAcquisitionDevice.h index 76bf09c2..60a83a06 100644 --- a/receiver/host/MockAcquisitionDevice.h +++ b/receiver/host/MockAcquisitionDevice.h @@ -8,7 +8,6 @@ #include "../../common/ThreadSafeFIFO.h" class MockAcquisitionDevice : public AcquisitionDevice { - ThreadSafeFIFO mailbox_fifo; uint32_t current_handle = 0; uint32_t max_handle = 0; ActionConfig cfg; @@ -19,8 +18,6 @@ class MockAcquisitionDevice : public AcquisitionDevice { void HW_WriteActionRegister(const ActionConfig *job) override; void HW_StartAction() override; - Completion ReadCompletion() override; - void HW_SetCancelDataCollectionBit() override; bool HW_SendWorkRequest(uint32_t handle) override; bool HW_IsIdle() const override; @@ -30,7 +27,7 @@ class MockAcquisitionDevice : public AcquisitionDevice { void CopyInternalPacketGenFrameToDeviceBuffer() override; public: MockAcquisitionDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules); - void AddModule(uint64_t frame, uint16_t module, const uint16_t *data); + void AddModule(uint64_t frame, uint16_t module_number, const uint16_t *data); void Terminate(); void InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) override; }; diff --git a/receiver/host/PCIExpressDevice.cpp b/receiver/host/PCIExpressDevice.cpp index d00d469e..fd5c6249 100644 --- a/receiver/host/PCIExpressDevice.cpp +++ b/receiver/host/PCIExpressDevice.cpp @@ -93,13 +93,13 @@ bool PCIExpressDevice::HW_SendWorkRequest(uint32_t handle) { return true; } -void PCIExpressDevice::HW_StartAction() { +void PCIExpressDevice::FPGA_StartAction() { if (ioctl(fd, IOCTL_JFJOCH_START) != 0) throw JFJochException(JFJochExceptionCategory::PCIeError, "Failed starting action", errno); } -void PCIExpressDevice::HW_EndAction() { +void PCIExpressDevice::FPGA_EndAction() { if (ioctl(fd, IOCTL_JFJOCH_END) != 0) throw JFJochException(JFJochExceptionCategory::PCIeError, "Failed ending action", errno); diff --git a/receiver/host/PCIExpressDevice.h b/receiver/host/PCIExpressDevice.h index b1979c69..5a07de7e 100644 --- a/receiver/host/PCIExpressDevice.h +++ b/receiver/host/PCIExpressDevice.h @@ -12,13 +12,13 @@ class PCIExpressDevice : public FPGAAcquisitionDevice { bool HW_ReadMailbox(uint32_t values[12]); void HW_SetCancelDataCollectionBit() override; bool HW_SendWorkRequest(uint32_t handle) override; - void HW_StartAction() override; + void FPGA_StartAction() override; bool HW_IsIdle() const final; void HW_WriteActionRegister(const ActionConfig *job) override; void HW_ReadActionRegister(ActionConfig *job) override; uint64_t HW_GetMACAddress() const override; uint32_t HW_GetIPv4Address() const override; - void HW_EndAction() override; + void FPGA_EndAction() override; void Reset(); void GetStatus_Internal(ActionStatus *status) const; diff --git a/tests/MockAcquisitionDeviceTest.cpp b/tests/MockAcquisitionDeviceTest.cpp index 6191eae7..e54b8cfe 100644 --- a/tests/MockAcquisitionDeviceTest.cpp +++ b/tests/MockAcquisitionDeviceTest.cpp @@ -15,6 +15,7 @@ TEST_CASE("MockAcquisitionDevice") { device.StartAction(experiment); + device.AddModule(1,0,module_data.data()); device.AddModule(1,1,module_data.data()); device.AddModule(4,0,module_data.data());