ProcessJFPacket: Doesn't include JF conversion

This commit is contained in:
2023-04-07 13:21:06 +02:00
parent 2c9d97c198
commit bffaa17bcd
4 changed files with 34 additions and 103 deletions

View File

@@ -12,8 +12,7 @@ ProcessJFPacket::ProcessJFPacket(ThreadSafeFIFO<Completion> &in_c, ThreadSafeFIF
: m(2 * nmodules),
c_fifo(in_c),
wr_fifo(in_wr),
module_info(2 * nmodules),
conversion(nmodules, nullptr)
module_info(2 * nmodules)
{
for (auto &i: module_info)
i.c.frame_number = UINT64_MAX;
@@ -26,11 +25,6 @@ ProcessJFPacket::~ProcessJFPacket() {
}
}
void ProcessJFPacket::RegisterConversion(uint32_t module_number, JFConversion *conv) {
do_conversion = true;
conversion.at(module_number) = conv;
}
void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip) {
if (datagram->framenum == 0)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Frame number cannot be zero");
@@ -40,56 +34,41 @@ void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip) {
uint32_t module_number = (ipv4_src_addr_host % 32) / 2;
bool second_half_module = (ipv4_src_addr_host % 2 == 1);
uint32_t packetnum = datagram->packetnum | (second_half_module ? 64 : 0);
uint64_t counter = (module_number * 2) | (frame_number % 2);
uint64_t module_info_location = (module_number * 2) | (frame_number % 2);
if (counter > module_info.size())
if (module_info_location > module_info.size())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Module number out of bounds");
{
std::shared_lock<std::shared_mutex> sl(m[counter]);
if (module_info[counter].c.frame_number != frame_number) {
sl.unlock();
{
std::unique_lock<std::shared_mutex> ul(m[counter]);
// condition is reevaluated inside unique_lock
if (module_info[counter].c.frame_number != frame_number) {
if (module_info[counter].c.frame_number != UINT64_MAX)
c_fifo.Put(module_info[counter].c);
std::unique_lock<std::mutex> ul(m[module_info_location]);
if (module_info[module_info_location].c.frame_number != frame_number) {
if (module_info[module_info_location].c.frame_number != UINT64_MAX)
c_fifo.Put(module_info[module_info_location].c);
auto wr = wr_fifo.GetBlocking();
auto wr = wr_fifo.GetBlocking();
module_info[counter].c.type = Completion::Type::Image;
module_info[counter].c.frame_number = frame_number;
module_info[counter].c.timestamp = datagram->timestamp;
module_info[counter].c.bunchid = datagram->bunchid;
module_info[counter].c.debug = datagram->debug;
module_info[counter].c.packet_mask[0] = 0;
module_info[counter].c.packet_mask[1] = 0;
module_info[counter].c.packet_count = 0;
module_info[counter].c.module = module_number;
module_info[module_info_location].c.type = Completion::Type::Image;
module_info[module_info_location].c.frame_number = frame_number;
module_info[module_info_location].c.timestamp = datagram->timestamp;
module_info[module_info_location].c.bunchid = datagram->bunchid;
module_info[module_info_location].c.debug = datagram->debug;
module_info[module_info_location].c.packet_mask[0] = 0;
module_info[module_info_location].c.packet_mask[1] = 0;
module_info[module_info_location].c.packet_count = 0;
module_info[module_info_location].c.module = module_number;
module_info[counter].c.handle = wr.handle;
module_info[counter].ptr = wr.ptr;
}
}
sl.lock();
module_info[module_info_location].c.handle = wr.handle;
module_info[module_info_location].ptr = wr.ptr;
}
module_info[counter].c.packet_count++;
module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64));
module_info[module_info_location].c.packet_count++;
module_info[module_info_location].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64));
if (!do_conversion)
memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t));
else {
if (conversion[module_number] == nullptr)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Conversion procedure not registered");
conversion[module_number]->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum),
datagram->data, packetnum);
}
memcpy(module_info[module_info_location].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t));
}
counter++;
packet_counter++;
}
uint64_t ProcessJFPacket::GetCounter() {
return counter;
return packet_counter;
}

View File

@@ -22,17 +22,14 @@ struct ModuleInfo {
};
class ProcessJFPacket {
std::vector<std::shared_mutex> m;
std::vector<std::mutex> m;
ThreadSafeFIFO<Completion> &c_fifo;
ThreadSafeFIFO<ProcessWorkRequest> &wr_fifo;
std::vector<JFConversion *> conversion;
std::vector<ModuleInfo> module_info;
bool do_conversion = false;
std::atomic<uint64_t> counter = 0;
std::atomic<uint64_t> packet_counter = 0;
public:
ProcessJFPacket(ThreadSafeFIFO<Completion> &c, ThreadSafeFIFO<ProcessWorkRequest> &wr, uint32_t nmodules);
~ProcessJFPacket();
void RegisterConversion(uint32_t module_number, JFConversion* conv);
void ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip);
uint64_t GetCounter();
};

View File

@@ -12,6 +12,8 @@ TEST_CASE("ProcessRawPacketTest_Empty") {
ThreadSafeFIFO<ProcessWorkRequest> wr_fifo;
{
ProcessJFPacket process(c_fifo, wr_fifo, 2);
REQUIRE(process.GetCounter() == 0);
}
REQUIRE(c_fifo.Size() == 0);
}
@@ -50,6 +52,8 @@ TEST_CASE("ProcessRawPacketTest") {
datagram.bunchid = 84;
datagram.data[0] = 6346;
process.ProcessPacket(&datagram, experiment.GetSrcIPv4Address(1,7));
REQUIRE(process.GetCounter() == 3);
}
REQUIRE(c_fifo.Size() == 3);

View File

@@ -151,19 +151,11 @@ template <class T> void test_conversion_with_geom(Logger &logger) {
ntries * nframes * nmodules * RAW_MODULE_SIZE * sizeof(uint16_t) * 1000 * 1000/ ((double) elapsed.count() * 1024 * 1024 * 1024));
}
void process_thread(std::vector<jf_raw_packet> *packets, ProcessJFPacket *process,
uint32_t stride, uint32_t start) {
for (uint32_t i = start; i < packets->size(); i += stride)
process->ProcessPacket(&packets->at(i).jf, packets->at(i).ipv4_header_sour_ip);
}
void test_packet_processing(Logger &logger, uint32_t nthreads, bool conversion) {
void test_packet_processing(Logger &logger) {
size_t nframes = 128;
int64_t nmodules = 8;
int64_t ntries = 8;
DiffractionExperiment x(1,{nmodules});
std::vector<jf_raw_packet> packets(nframes * nmodules * 128);
std::vector<uint16_t> output(nframes * nmodules * CONVERTED_MODULE_SIZE);
@@ -182,48 +174,19 @@ void test_packet_processing(Logger &logger, uint32_t nthreads, bool conversion)
}
}
std::vector<JFConversionFixedPoint> v(nmodules);
JFModuleGainCalibration gain_calib = GainCalibrationFromTestFile();
for (int m = 0; m < nmodules; m++) {
JFModulePedestal pedestal_g0;
JFModulePedestal pedestal_g1;
JFModulePedestal pedestal_g2;
for (int i = 0; i < RAW_MODULE_SIZE; i++) {
pedestal_g0.GetPedestal()[i] = 3000 + i % 50 + m * 135;
pedestal_g1.GetPedestal()[i] = 15000 + i % 50 - m * 135;
pedestal_g2.GetPedestal()[i] = 14000 + i % 50 - m * 135;
}
v[m].Setup(gain_calib, pedestal_g0, pedestal_g1, pedestal_g2, 12.4);
}
x.Mode(DetectorMode::Conversion);
logger.Info("JF FP conversion input prepared");
auto start_time = std::chrono::system_clock::now();
for (int z = 0; z < ntries; z++) {
ThreadSafeFIFO<Completion> c;
ThreadSafeFIFO<ProcessWorkRequest> wr;
ProcessJFPacket process(c, wr, nmodules);
if (conversion) {
for (int i = 0; i < nmodules; i++)
process.RegisterConversion(i, &v[i]);
}
for (uint32_t i = 0; i < nmodules * nframes; i++)
wr.Put(ProcessWorkRequest{
.ptr = output.data() + i * RAW_MODULE_SIZE,
.handle = i
});
{
std::vector<std::future<void>> futures;
for (int i = 0; i < nthreads; i++)
futures.emplace_back(std::async(std::launch::async, &process_thread, &packets, &process, nthreads, i));
}
for (auto &packet: packets)
process.ProcessPacket(&packet.jf, packet.ipv4_header_sour_ip);
}
auto end_time = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
@@ -245,18 +208,6 @@ int main () {
logger.Info("Fixed point conversion (with geom)");
test_conversion_with_geom<JFConversionFixedPoint>(logger);
logger.Info("Packet processing with conversion (1 thread)");
test_packet_processing(logger, 1, true);
logger.Info("Packet processing with conversion (2 threads)");
test_packet_processing(logger, 2, true);
logger.Info("Packet processing with conversion (4 threads)");
test_packet_processing(logger, 4, true);
logger.Info("Packet processing with conversion (8 threads)");
test_packet_processing(logger, 8, true);
logger.Info("Packet processing without conversion");
test_packet_processing(logger, 1, false);
test_packet_processing(logger);
}