AcquisitionDevice: Rework work request queue

This commit is contained in:
2023-04-25 21:43:13 +02:00
parent 960e7d1d4e
commit 16bd6836d2
10 changed files with 46 additions and 69 deletions
+15 -27
View File
@@ -123,8 +123,6 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) {
HW_StartAction();
send_work_request_future = std::async(std::launch::async, &AcquisitionDevice::SendWorkRequestThread, this);
for (uint32_t i = 0; i < buffer_device.size(); i++)
SendWorkRequest(i);
@@ -133,6 +131,9 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) {
throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in completion queue");
start_time = std::chrono::system_clock::now();
if (logger)
logger->Info("Started");
}
void AcquisitionDevice::CopyInternalPacketGenFrameToDeviceBuffer() {
@@ -174,7 +175,7 @@ void AcquisitionDevice::WaitForActionComplete() {
}
if (logger != nullptr)
logger->Debug("Data stream " + std::to_string(data_stream)
logger->Info("Data stream " + std::to_string(data_stream)
+ " completion frame number " + std::to_string(c.frame_number)
+ " module " + std::to_string(c.module)
+ " handle " + std::to_string(c.handle)
@@ -187,23 +188,23 @@ void AcquisitionDevice::WaitForActionComplete() {
counters.SetAcquisitionFinished();
end_time = std::chrono::system_clock::now();
EndWorkRequestAndSignalQueues();
if (logger)
logger->Info("End 1");
HW_SetCancelDataCollectionBit();
if (logger)
logger->Info("End 2");
HW_EndAction();
if (logger)
logger->Info("End 3");
while (!HW_IsIdle())
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
void AcquisitionDevice::EndWorkRequestAndSignalQueues() {
HW_SetCancelDataCollectionBit();
work_request_queue.Put(0); // 0 = end, this is to ensure that in a priority queue end marker is always first to take
send_work_request_future.get();
}
void AcquisitionDevice::SendWorkRequest(uint32_t handle) {
work_request_queue.Put(handle+1);
work_request_queue.Put(WorkRequest{
.ptr = buffer_device.at(handle),
.handle = handle
});
}
uint64_t AcquisitionDevice::GetBytesReceived() const {
@@ -316,19 +317,6 @@ void AcquisitionDevice::UnmapBuffers() {
if (i != nullptr) munmap(i, FPGA_BUFFER_LOCATION_SIZE);
}
void AcquisitionDevice::SendWorkRequestThread() {
auto handle = work_request_queue.GetBlocking();
while (handle != 0) {
// Preferably use the smallest handle (to reduce buffer size for better TLB usage)
// So if work request cannot be sent, return handle and check again for the smallest one
if (!HW_SendWorkRequest(handle - 1)) {
work_request_queue.Put(handle);
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
handle = work_request_queue.GetBlocking();
}
}
void AcquisitionDevice::FrameBufferRelease(size_t frame_number, uint16_t module_number) {
auto handle = counters.GetBufferHandleAndClear(frame_number, module_number);
if (handle != AcquisitionOnlineCounters::HandleNotFound)
+1 -7
View File
@@ -37,13 +37,8 @@ class AcquisitionDevice {
int64_t expected_frames;
AcquisitionOnlineCounters counters;
ThreadSafeSet<uint64_t> work_request_queue;
std::future<void> send_work_request_future;
AcquisitionOfflineCounters completion_vector;
void SendWorkRequestThread();
void EndWorkRequestAndSignalQueues();
virtual void HW_WriteActionRegister(const ActionConfig *job) = 0;
@@ -62,6 +57,7 @@ class AcquisitionDevice {
virtual void CopyInternalPacketGenFrameToDeviceBuffer();
protected:
ThreadSafeFIFO<Completion> work_completion_queue;
ThreadSafeFIFO<WorkRequest> work_request_queue;
std::vector<uint16_t *> buffer_device;
std::vector<uint16_t> internal_pkt_gen_frame;
@@ -74,8 +70,6 @@ protected:
void UnmapBuffers();
void MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node);
virtual bool HW_SendWorkRequest(uint32_t handle) = 0;
public:
static constexpr const uint64_t HandleNotValid = UINT64_MAX;
+21
View File
@@ -5,12 +5,19 @@
void FPGAAcquisitionDevice::HW_StartAction() {
FPGA_StartAction();
stop_work_requests = false;
send_work_request_future = std::async(std::launch::async, &FPGAAcquisitionDevice::SendWorkRequestThread, this);
read_work_completion_future = std::async(std::launch::async, &FPGAAcquisitionDevice::ReadWorkCompletionThread, this);
}
void FPGAAcquisitionDevice::HW_EndAction() {
read_work_completion_future.get();
stop_work_requests = true;
send_work_request_future.get();
while (!HW_SendWorkRequest(UINT32_MAX))
std::this_thread::sleep_for(std::chrono::microseconds(10));
@@ -28,4 +35,18 @@ void FPGAAcquisitionDevice::ReadWorkCompletionThread() {
c = parse_hw_completion(values);
work_completion_queue.PutBlocking(c);
} while (c.type != Completion::Type::End);
}
void FPGAAcquisitionDevice::SendWorkRequestThread() {
while (!stop_work_requests) {
WorkRequest wr{};
if (work_request_queue.Get(wr)) {
if ( !HW_SendWorkRequest(wr.handle)) {
work_request_queue.Put(wr);
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
} else {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
}
+7 -2
View File
@@ -7,8 +7,6 @@
#include "AcquisitionDevice.h"
class FPGAAcquisitionDevice : public AcquisitionDevice {
virtual bool HW_ReadMailbox(uint32_t values[12]) = 0;
virtual void FPGA_StartAction() = 0;
virtual void FPGA_EndAction() = 0;
@@ -17,6 +15,13 @@ class FPGAAcquisitionDevice : public AcquisitionDevice {
std::future<void> read_work_completion_future;
void ReadWorkCompletionThread();
std::future<void> send_work_request_future;
volatile bool stop_work_requests = false;
void SendWorkRequestThread();
virtual bool HW_ReadMailbox(uint32_t values[12]) = 0;
virtual bool HW_SendWorkRequest(uint32_t handle) = 0;
protected:
explicit FPGAAcquisitionDevice(uint16_t data_stream) : AcquisitionDevice(data_stream) {}
};
+1 -11
View File
@@ -33,7 +33,7 @@ void LinuxSocketDevice::MeasureThread(int fd) {
});
try {
ProcessJFPacket process(work_completion_queue, wr_queue, max_modules);
ProcessJFPacket process(work_completion_queue, work_request_queue, max_modules);
while (!cancel) {
auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0);
@@ -109,16 +109,6 @@ void LinuxSocketDevice::HW_SetCancelDataCollectionBit() {
cancel = true;
}
bool LinuxSocketDevice::HW_SendWorkRequest(uint32_t handle) {
if (handle != UINT32_MAX)
wr_queue.Put(WorkRequest{
.ptr = buffer_device.at(handle),
.handle = handle
});
return true;
}
void LinuxSocketDevice::HW_GetStatus(ActionStatus *status) const {
memset(status, 0, sizeof(ActionStatus));
-3
View File
@@ -9,8 +9,6 @@
#include "AcquisitionDevice.h"
class LinuxSocketDevice : public AcquisitionDevice {
ThreadSafeFIFO<WorkRequest> wr_queue;
int32_t rcv_buf_size;
uint64_t mac_addr;
@@ -29,7 +27,6 @@ class LinuxSocketDevice : public AcquisitionDevice {
void HW_StartAction() override;
bool HW_IsIdle() const override;
void HW_SetCancelDataCollectionBit() override;
bool HW_SendWorkRequest(uint32_t handle) override;
void HW_GetStatus(ActionStatus *status) const override;
uint64_t HW_GetMACAddress() const override;
uint32_t HW_GetIPv4Address() const override;
+1 -11
View File
@@ -50,7 +50,7 @@ void MlxRawEthDevice::MeasureThread() {
IBCompletionQueue cq(context, BUFFER_COUNT+2);
IBQueuePair qp(pd, cq, 16, BUFFER_COUNT);
IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node);
ProcessJFPacket process(work_completion_queue, wr_queue, cfg.nmodules);
ProcessJFPacket process(work_completion_queue, work_request_queue, cfg.nmodules);
qp.Init();
qp.ReadyToReceive();
@@ -107,16 +107,6 @@ void MlxRawEthDevice::HW_SetCancelDataCollectionBit() {
cancel = true;
}
bool MlxRawEthDevice::HW_SendWorkRequest(uint32_t handle) {
if (handle != UINT32_MAX)
wr_queue.Put(WorkRequest{
.ptr = buffer_device.at(handle),
.handle = handle
});
return true;
}
void MlxRawEthDevice::HW_GetStatus(ActionStatus *status) const {
memset(status, 0, sizeof(ActionStatus));
-2
View File
@@ -19,7 +19,6 @@ class MlxRawEthDevice : public AcquisitionDevice {
std::mutex m;
IBContext context;
ThreadSafeFIFO<WorkRequest> wr_queue;
uint64_t mac_addr;
uint32_t ipv4_addr;
ActionConfig cfg;
@@ -43,7 +42,6 @@ class MlxRawEthDevice : public AcquisitionDevice {
void HW_StartAction() override;
bool HW_IsIdle() const override;
void HW_SetCancelDataCollectionBit() override;
bool HW_SendWorkRequest(uint32_t handle) override;
void HW_GetStatus(ActionStatus *status) const override;
uint64_t HW_GetMACAddress() const override;
uint32_t HW_GetIPv4Address() const override;
-5
View File
@@ -21,11 +21,6 @@ void MockAcquisitionDevice::HW_SetCancelDataCollectionBit() {
Terminate();
}
bool MockAcquisitionDevice::HW_SendWorkRequest(uint32_t handle) {
return true;
}
bool MockAcquisitionDevice::HW_IsIdle() const {
return true;
}
-1
View File
@@ -19,7 +19,6 @@ class MockAcquisitionDevice : public AcquisitionDevice {
void HW_StartAction() override;
void HW_SetCancelDataCollectionBit() override;
bool HW_SendWorkRequest(uint32_t handle) override;
bool HW_IsIdle() const override;
uint64_t HW_GetMACAddress() const override;
uint32_t HW_GetIPv4Address() const override;