// Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later #include #include "ProcessJFPacket.h" #include "RawJFUDPPacket.h" #include "../common/JFJochException.h" ProcessJFPacket::ProcessJFPacket(ThreadSafeFIFO &in_c, ThreadSafeFIFO &in_wr, uint32_t nmodules) : c_fifo(in_c), wr_fifo(in_wr), module_info(2 * nmodules), conversion(nmodules, nullptr) { for (auto &i: module_info) i.c.frame_number = UINT64_MAX; } ProcessJFPacket::~ProcessJFPacket() { for (auto &i: module_info) { if (i.c.frame_number != UINT64_MAX) c_fifo.Put(i.c); } } void ProcessJFPacket::RegisterConversion(uint32_t module_number, JFConversion *conv) { conversion.at(module_number) = conv; } bool ProcessJFPacket::ProcessPacket(jf_payload *datagram, uint32_t src_ip, bool do_conversion) { if (datagram->framenum == 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Frame number cannot be zero"); uint64_t frame_number = datagram->framenum - 1; uint32_t ipv4_src_addr_host = src_ip >> 24; uint32_t module_number = (ipv4_src_addr_host % 32) / 2; bool second_half_module = (ipv4_src_addr_host % 2 == 1); uint32_t packetnum = datagram->packetnum | (second_half_module ? 64 : 0); uint64_t counter = (module_number * 2) | (frame_number % 2); if (counter > module_info.size()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Module number out of bounds"); { std::shared_lock sl(m); if (module_info[counter].c.frame_number != frame_number) { sl.unlock(); { std::unique_lock ul(m); if (module_info[counter].c.frame_number != UINT64_MAX) c_fifo.Put(module_info[counter].c); auto wr = wr_fifo.GetBlocking(); module_info[counter].c.type = Completion::Type::Image; module_info[counter].c.frame_number = frame_number; module_info[counter].c.timestamp = datagram->timestamp; module_info[counter].c.bunchid = datagram->bunchid; module_info[counter].c.debug = datagram->debug; module_info[counter].c.packet_mask[0] = 0; module_info[counter].c.packet_mask[1] = 0; module_info[counter].c.packet_count = 0; module_info[counter].c.module = module_number; module_info[counter].c.handle = wr.handle; module_info[counter].ptr = wr.ptr; } sl.lock(); } module_info[counter].c.packet_count++; module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); if (!do_conversion) memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); else { if (conversion[module_number] == nullptr) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Conversion procedure not registered"); conversion[module_number]->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram->data, packetnum); } } return true; }