diff --git a/jungfrau/ProcessJFPacket.cpp b/jungfrau/ProcessJFPacket.cpp index 279b5730..11b3f5b2 100644 --- a/jungfrau/ProcessJFPacket.cpp +++ b/jungfrau/ProcessJFPacket.cpp @@ -12,8 +12,7 @@ ProcessJFPacket::ProcessJFPacket(ThreadSafeFIFO &in_c, ThreadSafeFIF : m(2 * nmodules), c_fifo(in_c), wr_fifo(in_wr), - module_info(2 * nmodules), - conversion(nmodules, nullptr) + module_info(2 * nmodules) { for (auto &i: module_info) i.c.frame_number = UINT64_MAX; @@ -26,11 +25,6 @@ ProcessJFPacket::~ProcessJFPacket() { } } -void ProcessJFPacket::RegisterConversion(uint32_t module_number, JFConversion *conv) { - do_conversion = true; - conversion.at(module_number) = conv; -} - void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip) { if (datagram->framenum == 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Frame number cannot be zero"); @@ -40,56 +34,41 @@ void ProcessJFPacket::ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip) { uint32_t module_number = (ipv4_src_addr_host % 32) / 2; bool second_half_module = (ipv4_src_addr_host % 2 == 1); uint32_t packetnum = datagram->packetnum | (second_half_module ? 64 : 0); - uint64_t counter = (module_number * 2) | (frame_number % 2); + uint64_t module_info_location = (module_number * 2) | (frame_number % 2); - if (counter > module_info.size()) + if (module_info_location > module_info.size()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Module number out of bounds"); { - std::shared_lock sl(m[counter]); - if (module_info[counter].c.frame_number != frame_number) { - sl.unlock(); - { - std::unique_lock ul(m[counter]); - // condition is reevaluated inside unique_lock - if (module_info[counter].c.frame_number != frame_number) { - if (module_info[counter].c.frame_number != UINT64_MAX) - c_fifo.Put(module_info[counter].c); + std::unique_lock ul(m[module_info_location]); + if (module_info[module_info_location].c.frame_number != frame_number) { + if (module_info[module_info_location].c.frame_number != UINT64_MAX) + c_fifo.Put(module_info[module_info_location].c); - auto wr = wr_fifo.GetBlocking(); + auto wr = wr_fifo.GetBlocking(); - module_info[counter].c.type = Completion::Type::Image; - module_info[counter].c.frame_number = frame_number; - module_info[counter].c.timestamp = datagram->timestamp; - module_info[counter].c.bunchid = datagram->bunchid; - module_info[counter].c.debug = datagram->debug; - module_info[counter].c.packet_mask[0] = 0; - module_info[counter].c.packet_mask[1] = 0; - module_info[counter].c.packet_count = 0; - module_info[counter].c.module = module_number; + module_info[module_info_location].c.type = Completion::Type::Image; + module_info[module_info_location].c.frame_number = frame_number; + module_info[module_info_location].c.timestamp = datagram->timestamp; + module_info[module_info_location].c.bunchid = datagram->bunchid; + module_info[module_info_location].c.debug = datagram->debug; + module_info[module_info_location].c.packet_mask[0] = 0; + module_info[module_info_location].c.packet_mask[1] = 0; + module_info[module_info_location].c.packet_count = 0; + module_info[module_info_location].c.module = module_number; - module_info[counter].c.handle = wr.handle; - module_info[counter].ptr = wr.ptr; - } - } - sl.lock(); + module_info[module_info_location].c.handle = wr.handle; + module_info[module_info_location].ptr = wr.ptr; } - module_info[counter].c.packet_count++; - module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); + module_info[module_info_location].c.packet_count++; + module_info[module_info_location].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); - if (!do_conversion) - memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); - else { - if (conversion[module_number] == nullptr) - throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Conversion procedure not registered"); - conversion[module_number]->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), - datagram->data, packetnum); - } + memcpy(module_info[module_info_location].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); } - counter++; + packet_counter++; } uint64_t ProcessJFPacket::GetCounter() { - return counter; + return packet_counter; } diff --git a/jungfrau/ProcessJFPacket.h b/jungfrau/ProcessJFPacket.h index 06cd7351..1066330d 100644 --- a/jungfrau/ProcessJFPacket.h +++ b/jungfrau/ProcessJFPacket.h @@ -22,17 +22,14 @@ struct ModuleInfo { }; class ProcessJFPacket { - std::vector m; + std::vector m; ThreadSafeFIFO &c_fifo; ThreadSafeFIFO &wr_fifo; - std::vector conversion; std::vector module_info; - bool do_conversion = false; - std::atomic counter = 0; + std::atomic packet_counter = 0; public: ProcessJFPacket(ThreadSafeFIFO &c, ThreadSafeFIFO &wr, uint32_t nmodules); ~ProcessJFPacket(); - void RegisterConversion(uint32_t module_number, JFConversion* conv); void ProcessPacket(jf_udp_payload *datagram, uint32_t src_ip); uint64_t GetCounter(); }; diff --git a/tests/ProcessRawPacketTest.cpp b/tests/ProcessRawPacketTest.cpp index 373a8466..fc0f72fd 100644 --- a/tests/ProcessRawPacketTest.cpp +++ b/tests/ProcessRawPacketTest.cpp @@ -12,6 +12,8 @@ TEST_CASE("ProcessRawPacketTest_Empty") { ThreadSafeFIFO wr_fifo; { ProcessJFPacket process(c_fifo, wr_fifo, 2); + + REQUIRE(process.GetCounter() == 0); } REQUIRE(c_fifo.Size() == 0); } @@ -50,6 +52,8 @@ TEST_CASE("ProcessRawPacketTest") { datagram.bunchid = 84; datagram.data[0] = 6346; process.ProcessPacket(&datagram, experiment.GetSrcIPv4Address(1,7)); + + REQUIRE(process.GetCounter() == 3); } REQUIRE(c_fifo.Size() == 3); diff --git a/tools/JFCalibrationPerfTest.cpp b/tools/JFCalibrationPerfTest.cpp index bd97119f..97b2664e 100644 --- a/tools/JFCalibrationPerfTest.cpp +++ b/tools/JFCalibrationPerfTest.cpp @@ -151,19 +151,11 @@ template void test_conversion_with_geom(Logger &logger) { ntries * nframes * nmodules * RAW_MODULE_SIZE * sizeof(uint16_t) * 1000 * 1000/ ((double) elapsed.count() * 1024 * 1024 * 1024)); } -void process_thread(std::vector *packets, ProcessJFPacket *process, - uint32_t stride, uint32_t start) { - for (uint32_t i = start; i < packets->size(); i += stride) - process->ProcessPacket(&packets->at(i).jf, packets->at(i).ipv4_header_sour_ip); -} - -void test_packet_processing(Logger &logger, uint32_t nthreads, bool conversion) { +void test_packet_processing(Logger &logger) { size_t nframes = 128; int64_t nmodules = 8; int64_t ntries = 8; - DiffractionExperiment x(1,{nmodules}); - std::vector packets(nframes * nmodules * 128); std::vector output(nframes * nmodules * CONVERTED_MODULE_SIZE); @@ -182,48 +174,19 @@ void test_packet_processing(Logger &logger, uint32_t nthreads, bool conversion) } } - std::vector v(nmodules); - - JFModuleGainCalibration gain_calib = GainCalibrationFromTestFile(); - - for (int m = 0; m < nmodules; m++) { - JFModulePedestal pedestal_g0; - JFModulePedestal pedestal_g1; - JFModulePedestal pedestal_g2; - - for (int i = 0; i < RAW_MODULE_SIZE; i++) { - pedestal_g0.GetPedestal()[i] = 3000 + i % 50 + m * 135; - pedestal_g1.GetPedestal()[i] = 15000 + i % 50 - m * 135; - pedestal_g2.GetPedestal()[i] = 14000 + i % 50 - m * 135; - } - v[m].Setup(gain_calib, pedestal_g0, pedestal_g1, pedestal_g2, 12.4); - } - - x.Mode(DetectorMode::Conversion); - - logger.Info("JF FP conversion input prepared"); auto start_time = std::chrono::system_clock::now(); for (int z = 0; z < ntries; z++) { ThreadSafeFIFO c; ThreadSafeFIFO wr; ProcessJFPacket process(c, wr, nmodules); - if (conversion) { - for (int i = 0; i < nmodules; i++) - process.RegisterConversion(i, &v[i]); - } - for (uint32_t i = 0; i < nmodules * nframes; i++) wr.Put(ProcessWorkRequest{ .ptr = output.data() + i * RAW_MODULE_SIZE, .handle = i }); - - { - std::vector> futures; - for (int i = 0; i < nthreads; i++) - futures.emplace_back(std::async(std::launch::async, &process_thread, &packets, &process, nthreads, i)); - } + for (auto &packet: packets) + process.ProcessPacket(&packet.jf, packet.ipv4_header_sour_ip); } auto end_time = std::chrono::system_clock::now(); auto elapsed = std::chrono::duration_cast(end_time - start_time); @@ -245,18 +208,6 @@ int main () { logger.Info("Fixed point conversion (with geom)"); test_conversion_with_geom(logger); - logger.Info("Packet processing with conversion (1 thread)"); - test_packet_processing(logger, 1, true); - - logger.Info("Packet processing with conversion (2 threads)"); - test_packet_processing(logger, 2, true); - - logger.Info("Packet processing with conversion (4 threads)"); - test_packet_processing(logger, 4, true); - - logger.Info("Packet processing with conversion (8 threads)"); - test_packet_processing(logger, 8, true); - logger.Info("Packet processing without conversion"); - test_packet_processing(logger, 1, false); + test_packet_processing(logger); }