Files
Jungfraujoch/receiver/host/ProcessJFPacket.cpp

78 lines
3.0 KiB
C++

// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#include <cstring>
#include "ProcessJFPacket.h"
#include "RawJFUDPPacket.h"
#include "../common/JFJochException.h"
ProcessJFPacket::ProcessJFPacket(ThreadSafeFIFO<Completion> &in_c, ThreadSafeFIFO<ProcessWorkRequest> &in_wr,
uint32_t nmodules, JFConversion *in_conversion)
: c_fifo(in_c),
wr_fifo(in_wr),
module_info(2 * nmodules),
conversion(in_conversion) {
for (auto &i: module_info)
i.c.frame_number = UINT64_MAX;
}
ProcessJFPacket::~ProcessJFPacket() {
for (auto &i: module_info) {
if (i.c.frame_number != UINT64_MAX)
c_fifo.Put(i.c);
}
}
bool ProcessJFPacket::ProcessPacket(jf_payload *datagram, uint32_t src_ip) {
if (datagram->framenum == 0)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Frame number cannot be zero");
uint64_t frame_number = datagram->framenum - 1;
uint32_t ipv4_src_addr_host = src_ip >> 24;
uint64_t module_number = (ipv4_src_addr_host % 32) / 2;
bool second_half_module = (ipv4_src_addr_host % 2 == 1);
uint32_t packetnum = datagram->packetnum | (second_half_module ? 64 : 0);
uint64_t counter = (module_number * 2) | (frame_number % 2);
if (counter > module_info.size())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Module number out of bounds");
{
std::shared_lock<std::shared_mutex> sl(m);
if (module_info[counter].c.frame_number != frame_number) {
sl.unlock();
{
std::unique_lock<std::shared_mutex> ul(m);
if (module_info[counter].c.frame_number != UINT64_MAX)
c_fifo.Put(module_info[counter].c);
auto wr = wr_fifo.GetBlocking();
module_info[counter].c.type = Completion::Type::Image;
module_info[counter].c.frame_number = frame_number;
module_info[counter].c.timestamp = datagram->timestamp;
module_info[counter].c.bunchid = datagram->bunchid;
module_info[counter].c.debug = datagram->debug;
module_info[counter].c.packet_mask[0] = 0;
module_info[counter].c.packet_mask[1] = 0;
module_info[counter].c.packet_count = 0;
module_info[counter].c.module = module_number;
module_info[counter].c.handle = wr.handle;
module_info[counter].ptr = wr.ptr;
}
sl.lock();
}
module_info[counter].c.packet_count++;
module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64));
if (conversion)
conversion->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram->data,
packetnum);
else
memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t));
}
return true;
}