AcquisitionDevice: Completion queue is handled by particular implementation of the device

This commit is contained in:
2023-04-25 15:58:07 +02:00
parent f01f2e79d1
commit bf2a23ef7e
15 changed files with 64 additions and 71 deletions

View File

@@ -118,22 +118,20 @@ void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) {
throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError,
"Mismatch between expected and actual values of configuration registers (#modules)");
// Ensure internal queues are empty
// Ensure internal WR queue is empty
work_request_queue.Clear();
work_completion_queue.Clear();
HW_StartAction();
send_work_request_future = std::async(std::launch::async, &AcquisitionDevice::SendWorkRequestThread, this);
read_work_completion_future = std::async(std::launch::async, &AcquisitionDevice::ReadWorkCompletionThread, this);
for (uint32_t i = 0; i < buffer_device.size(); i++)
SendWorkRequest(i);
auto c = work_completion_queue.GetBlocking();
if (c.type != Completion::Type::Start)
throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in completion queue");
for (uint32_t i = 0; i < buffer_device.size(); i++)
SendWorkRequest(i);
start_time = std::chrono::system_clock::now();
}
@@ -162,14 +160,6 @@ const std::vector<uint16_t> &AcquisitionDevice::GetInternalGeneratorFrame() cons
return internal_pkt_gen_frame;
}
void AcquisitionDevice::ReadWorkCompletionThread() {
Completion c;
do {
c = ReadCompletion();
work_completion_queue.PutBlocking(c);
} while (c.type != Completion::Type::End);
}
void AcquisitionDevice::WaitForActionComplete() {
auto c = work_completion_queue.GetBlocking();
@@ -193,7 +183,7 @@ void AcquisitionDevice::WaitForActionComplete() {
c = work_completion_queue.GetBlocking();
}
bytes_received = c.frame_number * 8192LU;
read_work_completion_future.get();
counters.SetAcquisitionFinished();
end_time = std::chrono::system_clock::now();

View File

@@ -38,10 +38,6 @@ class AcquisitionDevice {
int64_t expected_frames;
AcquisitionOnlineCounters counters;
ThreadSafeFIFO<Completion> work_completion_queue;
std::future<void> read_work_completion_future;
void ReadWorkCompletionThread();
ThreadSafeSet<uint64_t> work_request_queue;
std::future<void> send_work_request_future;
@@ -50,15 +46,12 @@ class AcquisitionDevice {
void SendWorkRequestThread();
void EndWorkRequestAndSignalQueues();
virtual Completion ReadCompletion() = 0;
virtual void HW_WriteActionRegister(const ActionConfig *job) = 0;
virtual void HW_ReadActionRegister(ActionConfig *job) = 0;
virtual void HW_StartAction() = 0;
virtual bool HW_IsIdle() const = 0;
virtual void HW_SetCancelDataCollectionBit() = 0;
virtual bool HW_SendWorkRequest(uint32_t handle) = 0;
virtual void HW_GetStatus(ActionStatus *status) const = 0;
virtual void HW_GetEnvParams(ActionEnvParams *status) const {
memset(status, 0, sizeof(ActionEnvParams));
@@ -68,6 +61,8 @@ class AcquisitionDevice {
virtual void HW_EndAction() {}; // do clean-up after action is done
virtual void CopyInternalPacketGenFrameToDeviceBuffer();
protected:
ThreadSafeFIFO<Completion> work_completion_queue;
std::vector<uint16_t *> buffer_device;
std::vector<uint16_t> internal_pkt_gen_frame;
@@ -79,6 +74,8 @@ protected:
void UnmapBuffers();
void MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node);
virtual bool HW_SendWorkRequest(uint32_t handle) = 0;
public:
static constexpr const uint64_t HandleNotValid = UINT64_MAX;

View File

@@ -3,11 +3,25 @@
#include "FPGAAcquisitionDevice.h"
Completion FPGAAcquisitionDevice::ReadCompletion() {
uint32_t values[12];
while (!HW_ReadMailbox(values)) {
// The receiving FIFO level is less than or equal to the RIT threshold
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return parse_hw_completion(values);
void FPGAAcquisitionDevice::HW_StartAction() {
FPGA_StartAction();
read_work_completion_future = std::async(std::launch::async, &FPGAAcquisitionDevice::ReadWorkCompletionThread, this);
}
void FPGAAcquisitionDevice::HW_EndAction() {
read_work_completion_future.get();
FPGA_EndAction();
}
void FPGAAcquisitionDevice::ReadWorkCompletionThread() {
uint32_t values[12];
Completion c;
do {
while (!HW_ReadMailbox(values))
std::this_thread::sleep_for(std::chrono::microseconds(10));
c = parse_hw_completion(values);
work_completion_queue.PutBlocking(c);
} while (c.type != Completion::Type::End);
}

View File

@@ -8,7 +8,15 @@
class FPGAAcquisitionDevice : public AcquisitionDevice {
virtual bool HW_ReadMailbox(uint32_t values[12]) = 0;
Completion ReadCompletion() override;
virtual void FPGA_StartAction() = 0;
virtual void FPGA_EndAction() = 0;
void HW_StartAction() final;
void HW_EndAction() final;
std::future<void> read_work_completion_future;
void ReadWorkCompletionThread();
protected:
explicit FPGAAcquisitionDevice(uint16_t data_stream) : AcquisitionDevice(data_stream) {}
};

View File

@@ -135,7 +135,7 @@ void HLSSimulatedDevice::HW_WriteActionRegister(const ActionConfig *job) {
memcpy(&cfg, job, sizeof(ActionConfig));
}
void HLSSimulatedDevice::HW_StartAction() {
void HLSSimulatedDevice::FPGA_StartAction() {
if (action_thread.joinable())
action_thread.join();
@@ -145,6 +145,11 @@ void HLSSimulatedDevice::HW_StartAction() {
action_thread = std::thread(&HLSSimulatedDevice::HLSMainThread, this );
}
void HLSSimulatedDevice::FPGA_EndAction() {
if (action_thread.joinable())
action_thread.join();
}
HLSSimulatedDevice::~HLSSimulatedDevice() {
if (action_thread.joinable())
action_thread.join();

View File

@@ -39,7 +39,8 @@ class HLSSimulatedDevice : public FPGAAcquisitionDevice {
void HW_ReadActionRegister(ActionConfig *job) override;
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_StartAction() override;
void FPGA_StartAction() override;
void FPGA_EndAction() override;
bool HW_IsIdle() const override;
bool HW_ReadMailbox(uint32_t values[12]);
void HW_SetCancelDataCollectionBit() override;

View File

@@ -28,12 +28,12 @@ void LinuxSocketDevice::MeasureThread(int fd) {
uint64_t packet_count = 0;
completion_queue.Put(Completion{
work_completion_queue.Put(Completion{
.type = Completion::Type::Start
});
try {
ProcessJFPacket process(completion_queue, wr_queue, max_modules);
ProcessJFPacket process(work_completion_queue, wr_queue, max_modules);
while (!cancel) {
auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0);
@@ -49,7 +49,7 @@ void LinuxSocketDevice::MeasureThread(int fd) {
logger->ErrorException(e);
}
// End message should be sent always
completion_queue.Put(Completion{
work_completion_queue.Put(Completion{
.type = Completion::Type::End,
.frame_number = packet_count
});
@@ -57,10 +57,6 @@ void LinuxSocketDevice::MeasureThread(int fd) {
idle = true;
}
Completion LinuxSocketDevice::ReadCompletion() {
return completion_queue.GetBlocking();
}
void LinuxSocketDevice::HW_WriteActionRegister(const ActionConfig *job) {
memcpy(&cfg, job, sizeof(ActionConfig));
}

View File

@@ -9,10 +9,8 @@
#include "AcquisitionDevice.h"
class LinuxSocketDevice : public AcquisitionDevice {
ThreadSafeFIFO<Completion> completion_queue;
ThreadSafeFIFO<ProcessWorkRequest> wr_queue;
int32_t rcv_buf_size;
uint64_t mac_addr;
@@ -26,7 +24,6 @@ class LinuxSocketDevice : public AcquisitionDevice {
volatile bool cancel = false;
volatile bool idle = true;
Completion ReadCompletion() override;
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_ReadActionRegister(ActionConfig *job) override;
void HW_StartAction() override;

View File

@@ -29,10 +29,6 @@ int32_t MlxRawEthDevice::GetNUMANode() const {
return numa_node;
}
Completion MlxRawEthDevice::ReadCompletion() {
return completion_queue.GetBlocking();
}
void MlxRawEthDevice::HW_WriteActionRegister(const ActionConfig *job) {
memcpy(&cfg, job, sizeof(ActionConfig));
}
@@ -45,7 +41,7 @@ void MlxRawEthDevice::MeasureThread() {
uint64_t packet_count = 0;
completion_queue.Put(Completion{
work_completion_queue.Put(Completion{
.type = Completion::Type::Start
});
@@ -54,7 +50,7 @@ void MlxRawEthDevice::MeasureThread() {
IBCompletionQueue cq(context, BUFFER_COUNT+2);
IBQueuePair qp(pd, cq, 16, BUFFER_COUNT);
IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node);
ProcessJFPacket process(completion_queue, wr_queue, cfg.nmodules);
ProcessJFPacket process(work_completion_queue, wr_queue, cfg.nmodules);
qp.Init();
qp.ReadyToReceive();
@@ -86,7 +82,7 @@ void MlxRawEthDevice::MeasureThread() {
logger->ErrorException(e);
}
completion_queue.Put(Completion{
work_completion_queue.Put(Completion{
.type = Completion::Type::End,
.frame_number = packet_count
});

View File

@@ -19,7 +19,6 @@ class MlxRawEthDevice : public AcquisitionDevice {
std::mutex m;
IBContext context;
ThreadSafeFIFO<Completion> completion_queue;
ThreadSafeFIFO<ProcessWorkRequest> wr_queue;
uint64_t mac_addr;
uint32_t ipv4_addr;
@@ -39,7 +38,6 @@ class MlxRawEthDevice : public AcquisitionDevice {
ProcessJFPacket &process);
void MeasureThread();
Completion ReadCompletion() override;
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_ReadActionRegister(ActionConfig *job) override;
void HW_StartAction() override;

View File

@@ -15,10 +15,6 @@ void MockAcquisitionDevice::HW_WriteActionRegister(const ActionConfig *job) {
void MockAcquisitionDevice::HW_StartAction() {}
Completion MockAcquisitionDevice::ReadCompletion() {
return mailbox_fifo.GetBlocking();
}
void MockAcquisitionDevice::HW_SetCancelDataCollectionBit() {
if (logger)
logger->Info("MockAcquisitionDevice cancelling " + std::to_string(data_stream));
@@ -36,14 +32,10 @@ bool MockAcquisitionDevice::HW_IsIdle() const {
MockAcquisitionDevice::MockAcquisitionDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules)
: AcquisitionDevice(data_stream) {
max_modules = 16;
MapBuffersStandard(in_frame_buffer_size_modules,
(3 + 3 * 16) * max_modules + frames_int_pkt_gen, -1);
MapBuffersStandard(in_frame_buffer_size_modules, 1, -1);
max_handle = in_frame_buffer_size_modules;
mailbox_fifo.Put(Completion{.type = Completion::Type::Start});
work_completion_queue.Put(Completion{.type = Completion::Type::Start});
}
void MockAcquisitionDevice::SendCompletion(uint32_t handle, uint16_t module_number, uint64_t frame_number) {
@@ -55,7 +47,7 @@ void MockAcquisitionDevice::SendCompletion(uint32_t handle, uint16_t module_numb
c.packet_mask[0] = UINT64_MAX;
c.packet_mask[1] = UINT64_MAX;
c.packet_count = 128;
mailbox_fifo.Put(c);
work_completion_queue.Put(c);
}
void MockAcquisitionDevice::AddModule(uint64_t frame_number, uint16_t module_number, const uint16_t *data) {
@@ -74,10 +66,11 @@ void MockAcquisitionDevice::AddModule(uint64_t frame_number, uint16_t module_num
}
void MockAcquisitionDevice::Terminate() {
mailbox_fifo.Put(Completion{
work_completion_queue.Put(Completion{
.type = Completion::Type::End,
.frame_number = current_handle * 128
});
work_completion_queue.Put(Completion{.type = Completion::Type::Start});
}
uint64_t MockAcquisitionDevice::HW_GetMACAddress() const {

View File

@@ -8,7 +8,6 @@
#include "../../common/ThreadSafeFIFO.h"
class MockAcquisitionDevice : public AcquisitionDevice {
ThreadSafeFIFO<Completion> mailbox_fifo;
uint32_t current_handle = 0;
uint32_t max_handle = 0;
ActionConfig cfg;
@@ -19,8 +18,6 @@ class MockAcquisitionDevice : public AcquisitionDevice {
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_StartAction() override;
Completion ReadCompletion() override;
void HW_SetCancelDataCollectionBit() override;
bool HW_SendWorkRequest(uint32_t handle) override;
bool HW_IsIdle() const override;
@@ -30,7 +27,7 @@ class MockAcquisitionDevice : public AcquisitionDevice {
void CopyInternalPacketGenFrameToDeviceBuffer() override;
public:
MockAcquisitionDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules);
void AddModule(uint64_t frame, uint16_t module, const uint16_t *data);
void AddModule(uint64_t frame, uint16_t module_number, const uint16_t *data);
void Terminate();
void InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) override;
};

View File

@@ -93,13 +93,13 @@ bool PCIExpressDevice::HW_SendWorkRequest(uint32_t handle) {
return true;
}
void PCIExpressDevice::HW_StartAction() {
void PCIExpressDevice::FPGA_StartAction() {
if (ioctl(fd, IOCTL_JFJOCH_START) != 0)
throw JFJochException(JFJochExceptionCategory::PCIeError,
"Failed starting action", errno);
}
void PCIExpressDevice::HW_EndAction() {
void PCIExpressDevice::FPGA_EndAction() {
if (ioctl(fd, IOCTL_JFJOCH_END) != 0)
throw JFJochException(JFJochExceptionCategory::PCIeError,
"Failed ending action", errno);

View File

@@ -12,13 +12,13 @@ class PCIExpressDevice : public FPGAAcquisitionDevice {
bool HW_ReadMailbox(uint32_t values[12]);
void HW_SetCancelDataCollectionBit() override;
bool HW_SendWorkRequest(uint32_t handle) override;
void HW_StartAction() override;
void FPGA_StartAction() override;
bool HW_IsIdle() const final;
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_ReadActionRegister(ActionConfig *job) override;
uint64_t HW_GetMACAddress() const override;
uint32_t HW_GetIPv4Address() const override;
void HW_EndAction() override;
void FPGA_EndAction() override;
void Reset();
void GetStatus_Internal(ActionStatus *status) const;

View File

@@ -15,6 +15,7 @@ TEST_CASE("MockAcquisitionDevice") {
device.StartAction(experiment);
device.AddModule(1,0,module_data.data());
device.AddModule(1,1,module_data.data());
device.AddModule(4,0,module_data.data());