diff --git a/receiver/host/AcquisitionDevice.cpp b/receiver/host/AcquisitionDevice.cpp index 8df7bf49..960aabfc 100644 --- a/receiver/host/AcquisitionDevice.cpp +++ b/receiver/host/AcquisitionDevice.cpp @@ -123,8 +123,6 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) { HW_StartAction(); - send_work_request_future = std::async(std::launch::async, &AcquisitionDevice::SendWorkRequestThread, this); - for (uint32_t i = 0; i < buffer_device.size(); i++) SendWorkRequest(i); @@ -133,6 +131,9 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) { throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in completion queue"); start_time = std::chrono::system_clock::now(); + + if (logger) + logger->Info("Started"); } void AcquisitionDevice::CopyInternalPacketGenFrameToDeviceBuffer() { @@ -174,7 +175,7 @@ void AcquisitionDevice::WaitForActionComplete() { } if (logger != nullptr) - logger->Debug("Data stream " + std::to_string(data_stream) + logger->Info("Data stream " + std::to_string(data_stream) + " completion frame number " + std::to_string(c.frame_number) + " module " + std::to_string(c.module) + " handle " + std::to_string(c.handle) @@ -187,23 +188,23 @@ void AcquisitionDevice::WaitForActionComplete() { counters.SetAcquisitionFinished(); end_time = std::chrono::system_clock::now(); - - EndWorkRequestAndSignalQueues(); - + if (logger) + logger->Info("End 1"); + HW_SetCancelDataCollectionBit(); + if (logger) + logger->Info("End 2"); HW_EndAction(); - + if (logger) + logger->Info("End 3"); while (!HW_IsIdle()) std::this_thread::sleep_for(std::chrono::milliseconds(1)); } -void AcquisitionDevice::EndWorkRequestAndSignalQueues() { - HW_SetCancelDataCollectionBit(); - work_request_queue.Put(0); // 0 = end, this is to ensure that in a priority queue end marker is always first to take - send_work_request_future.get(); -} - void AcquisitionDevice::SendWorkRequest(uint32_t handle) { - work_request_queue.Put(handle+1); + work_request_queue.Put(WorkRequest{ + .ptr = buffer_device.at(handle), + .handle = handle + }); } uint64_t AcquisitionDevice::GetBytesReceived() const { @@ -316,19 +317,6 @@ void AcquisitionDevice::UnmapBuffers() { if (i != nullptr) munmap(i, FPGA_BUFFER_LOCATION_SIZE); } -void AcquisitionDevice::SendWorkRequestThread() { - auto handle = work_request_queue.GetBlocking(); - while (handle != 0) { - // Preferably use the smallest handle (to reduce buffer size for better TLB usage) - // So if work request cannot be sent, return handle and check again for the smallest one - if (!HW_SendWorkRequest(handle - 1)) { - work_request_queue.Put(handle); - std::this_thread::sleep_for(std::chrono::microseconds(10)); - } - handle = work_request_queue.GetBlocking(); - } -} - void AcquisitionDevice::FrameBufferRelease(size_t frame_number, uint16_t module_number) { auto handle = counters.GetBufferHandleAndClear(frame_number, module_number); if (handle != AcquisitionOnlineCounters::HandleNotFound) diff --git a/receiver/host/AcquisitionDevice.h b/receiver/host/AcquisitionDevice.h index 946d8f62..98ae5a93 100644 --- a/receiver/host/AcquisitionDevice.h +++ b/receiver/host/AcquisitionDevice.h @@ -37,13 +37,8 @@ class AcquisitionDevice { int64_t expected_frames; AcquisitionOnlineCounters counters; - - ThreadSafeSet work_request_queue; - std::future send_work_request_future; - AcquisitionOfflineCounters completion_vector; - void SendWorkRequestThread(); void EndWorkRequestAndSignalQueues(); virtual void HW_WriteActionRegister(const ActionConfig *job) = 0; @@ -62,6 +57,7 @@ class AcquisitionDevice { virtual void CopyInternalPacketGenFrameToDeviceBuffer(); protected: ThreadSafeFIFO work_completion_queue; + ThreadSafeFIFO work_request_queue; std::vector buffer_device; std::vector internal_pkt_gen_frame; @@ -74,8 +70,6 @@ protected: void UnmapBuffers(); void MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node); - - virtual bool HW_SendWorkRequest(uint32_t handle) = 0; public: static constexpr const uint64_t HandleNotValid = UINT64_MAX; diff --git a/receiver/host/FPGAAcquisitionDevice.cpp b/receiver/host/FPGAAcquisitionDevice.cpp index d69747f1..a97aeaaa 100644 --- a/receiver/host/FPGAAcquisitionDevice.cpp +++ b/receiver/host/FPGAAcquisitionDevice.cpp @@ -5,12 +5,19 @@ void FPGAAcquisitionDevice::HW_StartAction() { FPGA_StartAction(); + + stop_work_requests = false; + send_work_request_future = std::async(std::launch::async, &FPGAAcquisitionDevice::SendWorkRequestThread, this); + read_work_completion_future = std::async(std::launch::async, &FPGAAcquisitionDevice::ReadWorkCompletionThread, this); } void FPGAAcquisitionDevice::HW_EndAction() { read_work_completion_future.get(); + stop_work_requests = true; + send_work_request_future.get(); + while (!HW_SendWorkRequest(UINT32_MAX)) std::this_thread::sleep_for(std::chrono::microseconds(10)); @@ -28,4 +35,18 @@ void FPGAAcquisitionDevice::ReadWorkCompletionThread() { c = parse_hw_completion(values); work_completion_queue.PutBlocking(c); } while (c.type != Completion::Type::End); +} + +void FPGAAcquisitionDevice::SendWorkRequestThread() { + while (!stop_work_requests) { + WorkRequest wr{}; + if (work_request_queue.Get(wr)) { + if ( !HW_SendWorkRequest(wr.handle)) { + work_request_queue.Put(wr); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + } else { + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + } } \ No newline at end of file diff --git a/receiver/host/FPGAAcquisitionDevice.h b/receiver/host/FPGAAcquisitionDevice.h index ee690741..e754ed2b 100644 --- a/receiver/host/FPGAAcquisitionDevice.h +++ b/receiver/host/FPGAAcquisitionDevice.h @@ -7,8 +7,6 @@ #include "AcquisitionDevice.h" class FPGAAcquisitionDevice : public AcquisitionDevice { - virtual bool HW_ReadMailbox(uint32_t values[12]) = 0; - virtual void FPGA_StartAction() = 0; virtual void FPGA_EndAction() = 0; @@ -17,6 +15,13 @@ class FPGAAcquisitionDevice : public AcquisitionDevice { std::future read_work_completion_future; void ReadWorkCompletionThread(); + + std::future send_work_request_future; + volatile bool stop_work_requests = false; + void SendWorkRequestThread(); + + virtual bool HW_ReadMailbox(uint32_t values[12]) = 0; + virtual bool HW_SendWorkRequest(uint32_t handle) = 0; protected: explicit FPGAAcquisitionDevice(uint16_t data_stream) : AcquisitionDevice(data_stream) {} }; diff --git a/receiver/host/LinuxSocketDevice.cpp b/receiver/host/LinuxSocketDevice.cpp index 0dc1477f..d13c9e71 100644 --- a/receiver/host/LinuxSocketDevice.cpp +++ b/receiver/host/LinuxSocketDevice.cpp @@ -33,7 +33,7 @@ void LinuxSocketDevice::MeasureThread(int fd) { }); try { - ProcessJFPacket process(work_completion_queue, wr_queue, max_modules); + ProcessJFPacket process(work_completion_queue, work_request_queue, max_modules); while (!cancel) { auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0); @@ -109,16 +109,6 @@ void LinuxSocketDevice::HW_SetCancelDataCollectionBit() { cancel = true; } -bool LinuxSocketDevice::HW_SendWorkRequest(uint32_t handle) { - if (handle != UINT32_MAX) - wr_queue.Put(WorkRequest{ - .ptr = buffer_device.at(handle), - .handle = handle - }); - - return true; -} - void LinuxSocketDevice::HW_GetStatus(ActionStatus *status) const { memset(status, 0, sizeof(ActionStatus)); diff --git a/receiver/host/LinuxSocketDevice.h b/receiver/host/LinuxSocketDevice.h index 04f35b54..f56f7c77 100644 --- a/receiver/host/LinuxSocketDevice.h +++ b/receiver/host/LinuxSocketDevice.h @@ -9,8 +9,6 @@ #include "AcquisitionDevice.h" class LinuxSocketDevice : public AcquisitionDevice { - ThreadSafeFIFO wr_queue; - int32_t rcv_buf_size; uint64_t mac_addr; @@ -29,7 +27,6 @@ class LinuxSocketDevice : public AcquisitionDevice { void HW_StartAction() override; bool HW_IsIdle() const override; void HW_SetCancelDataCollectionBit() override; - bool HW_SendWorkRequest(uint32_t handle) override; void HW_GetStatus(ActionStatus *status) const override; uint64_t HW_GetMACAddress() const override; uint32_t HW_GetIPv4Address() const override; diff --git a/receiver/host/MlxRawEthDevice.cpp b/receiver/host/MlxRawEthDevice.cpp index 312bcef2..2ddd55e7 100644 --- a/receiver/host/MlxRawEthDevice.cpp +++ b/receiver/host/MlxRawEthDevice.cpp @@ -50,7 +50,7 @@ void MlxRawEthDevice::MeasureThread() { IBCompletionQueue cq(context, BUFFER_COUNT+2); IBQueuePair qp(pd, cq, 16, BUFFER_COUNT); IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node); - ProcessJFPacket process(work_completion_queue, wr_queue, cfg.nmodules); + ProcessJFPacket process(work_completion_queue, work_request_queue, cfg.nmodules); qp.Init(); qp.ReadyToReceive(); @@ -107,16 +107,6 @@ void MlxRawEthDevice::HW_SetCancelDataCollectionBit() { cancel = true; } -bool MlxRawEthDevice::HW_SendWorkRequest(uint32_t handle) { - if (handle != UINT32_MAX) - wr_queue.Put(WorkRequest{ - .ptr = buffer_device.at(handle), - .handle = handle - }); - - return true; -} - void MlxRawEthDevice::HW_GetStatus(ActionStatus *status) const { memset(status, 0, sizeof(ActionStatus)); diff --git a/receiver/host/MlxRawEthDevice.h b/receiver/host/MlxRawEthDevice.h index 42a7709e..a3f126e1 100644 --- a/receiver/host/MlxRawEthDevice.h +++ b/receiver/host/MlxRawEthDevice.h @@ -19,7 +19,6 @@ class MlxRawEthDevice : public AcquisitionDevice { std::mutex m; IBContext context; - ThreadSafeFIFO wr_queue; uint64_t mac_addr; uint32_t ipv4_addr; ActionConfig cfg; @@ -43,7 +42,6 @@ class MlxRawEthDevice : public AcquisitionDevice { void HW_StartAction() override; bool HW_IsIdle() const override; void HW_SetCancelDataCollectionBit() override; - bool HW_SendWorkRequest(uint32_t handle) override; void HW_GetStatus(ActionStatus *status) const override; uint64_t HW_GetMACAddress() const override; uint32_t HW_GetIPv4Address() const override; diff --git a/receiver/host/MockAcquisitionDevice.cpp b/receiver/host/MockAcquisitionDevice.cpp index 1f148d8f..1571aab9 100644 --- a/receiver/host/MockAcquisitionDevice.cpp +++ b/receiver/host/MockAcquisitionDevice.cpp @@ -21,11 +21,6 @@ void MockAcquisitionDevice::HW_SetCancelDataCollectionBit() { Terminate(); } -bool MockAcquisitionDevice::HW_SendWorkRequest(uint32_t handle) { - return true; -} - - bool MockAcquisitionDevice::HW_IsIdle() const { return true; } diff --git a/receiver/host/MockAcquisitionDevice.h b/receiver/host/MockAcquisitionDevice.h index 60a83a06..0af04f15 100644 --- a/receiver/host/MockAcquisitionDevice.h +++ b/receiver/host/MockAcquisitionDevice.h @@ -19,7 +19,6 @@ class MockAcquisitionDevice : public AcquisitionDevice { void HW_StartAction() override; void HW_SetCancelDataCollectionBit() override; - bool HW_SendWorkRequest(uint32_t handle) override; bool HW_IsIdle() const override; uint64_t HW_GetMACAddress() const override; uint32_t HW_GetIPv4Address() const override;