diff --git a/receiver/host/CMakeLists.txt b/receiver/host/CMakeLists.txt index b01bff36..908635f1 100644 --- a/receiver/host/CMakeLists.txt +++ b/receiver/host/CMakeLists.txt @@ -10,9 +10,9 @@ ADD_LIBRARY(JungfraujochHost STATIC MakeAcquisitionDevice.cpp MakeAcquisitionDevice.h AcquisitionOfflineCounters.cpp AcquisitionOfflineCounters.h IBWrappers.cpp IBWrappers.h - ProcessRawPacket.cpp ProcessRawPacket.h + ProcessJFPacket.cpp ProcessJFPacket.h RawEthDevice.cpp RawEthDevice.h - RawJFUDPPacket.h) + RawJFUDPPacket.h UdpReceiver.cpp UdpReceiver.h IBReceiver.cpp IBReceiver.h) TARGET_LINK_LIBRARIES(JungfraujochHost CommonFunctions HLSSimulation ${IBVERBS} JFCalibration) diff --git a/receiver/host/IBReceiver.cpp b/receiver/host/IBReceiver.cpp new file mode 100644 index 00000000..98580122 --- /dev/null +++ b/receiver/host/IBReceiver.cpp @@ -0,0 +1,4 @@ +// Copyright (2019-2022) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "IBReceiver.h" diff --git a/receiver/host/IBReceiver.h b/receiver/host/IBReceiver.h new file mode 100644 index 00000000..a6803707 --- /dev/null +++ b/receiver/host/IBReceiver.h @@ -0,0 +1,13 @@ +// Copyright (2019-2022) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef JUNGFRAUJOCH_IBRECEIVER_H +#define JUNGFRAUJOCH_IBRECEIVER_H + + +class IBReceiver { + +}; + + +#endif //JUNGFRAUJOCH_IBRECEIVER_H diff --git a/receiver/host/ProcessRawPacket.cpp b/receiver/host/ProcessJFPacket.cpp similarity index 65% rename from receiver/host/ProcessRawPacket.cpp rename to receiver/host/ProcessJFPacket.cpp index 5454ce3b..5dc0e143 100644 --- a/receiver/host/ProcessRawPacket.cpp +++ b/receiver/host/ProcessJFPacket.cpp @@ -3,12 +3,12 @@ #include -#include "ProcessRawPacket.h" +#include "ProcessJFPacket.h" #include "RawJFUDPPacket.h" #include "../common/JFJochException.h" -ProcessRawPacket::ProcessRawPacket(ThreadSafeFIFO &in_c, ThreadSafeFIFO &in_wr, - uint32_t nmodules, JFConversion *in_conversion) +ProcessJFPacket::ProcessJFPacket(ThreadSafeFIFO &in_c, ThreadSafeFIFO &in_wr, + uint32_t nmodules, JFConversion *in_conversion) : c_fifo(in_c), wr_fifo(in_wr), module_info(2 * nmodules), @@ -17,21 +17,21 @@ ProcessRawPacket::ProcessRawPacket(ThreadSafeFIFO &in_c, ThreadSafeF i.c.frame_number = UINT64_MAX; } -ProcessRawPacket::~ProcessRawPacket() { +ProcessJFPacket::~ProcessJFPacket() { for (auto &i: module_info) { if (i.c.frame_number != UINT64_MAX) c_fifo.Put(i.c); } } -bool ProcessRawPacket::ProcessPacket(jf_payload &datagram, uint32_t src_ip) { - if (datagram.framenum == 0) +bool ProcessJFPacket::ProcessPacket(jf_payload *datagram, uint32_t src_ip) { + if (datagram->framenum == 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Frame number cannot be zero"); - uint64_t frame_number = datagram.framenum - 1; + uint64_t frame_number = datagram->framenum - 1; uint32_t ipv4_src_addr_host = src_ip >> 24; uint64_t module_number = (ipv4_src_addr_host % 32) / 2; bool second_half_module = (ipv4_src_addr_host % 2 == 1); - uint32_t packetnum = datagram.packetnum | (second_half_module ? 64 : 0); + uint32_t packetnum = datagram->packetnum | (second_half_module ? 64 : 0); uint64_t counter = (module_number * 2) | (frame_number % 2); if (counter > module_info.size()) @@ -45,9 +45,9 @@ bool ProcessRawPacket::ProcessPacket(jf_payload &datagram, uint32_t src_ip) { module_info[counter].c.type = Completion::Type::Image; module_info[counter].c.frame_number = frame_number; - module_info[counter].c.timestamp = datagram.timestamp; - module_info[counter].c.bunchid = datagram.bunchid; - module_info[counter].c.debug = datagram.debug; + module_info[counter].c.timestamp = datagram->timestamp; + module_info[counter].c.bunchid = datagram->bunchid; + module_info[counter].c.debug = datagram->debug; module_info[counter].c.packet_mask[0] = 0; module_info[counter].c.packet_mask[1] = 0; module_info[counter].c.packet_count = 0; @@ -62,11 +62,9 @@ bool ProcessRawPacket::ProcessPacket(jf_payload &datagram, uint32_t src_ip) { module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); if (conversion) - conversion->ConvertPacket( (int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram.data, - packetnum); + conversion->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram->data, packetnum); else - memcpy((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram.data, - 4096 * sizeof(uint16_t)); + memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); return true; } diff --git a/receiver/host/ProcessRawPacket.h b/receiver/host/ProcessJFPacket.h similarity index 57% rename from receiver/host/ProcessRawPacket.h rename to receiver/host/ProcessJFPacket.h index 4540a483..91651fa3 100644 --- a/receiver/host/ProcessRawPacket.h +++ b/receiver/host/ProcessJFPacket.h @@ -1,8 +1,8 @@ // Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later -#ifndef JUNGFRAUJOCH_PROCESSRAWPACKET_H -#define JUNGFRAUJOCH_PROCESSRAWPACKET_H +#ifndef JUNGFRAUJOCH_PROCESSJFPACKET_H +#define JUNGFRAUJOCH_PROCESSJFPACKET_H #include "../../common/ThreadSafeFIFO.h" #include "Completion.h" @@ -19,17 +19,17 @@ struct ModuleInfo { Completion c; }; -class ProcessRawPacket { +class ProcessJFPacket { ThreadSafeFIFO &c_fifo; ThreadSafeFIFO &wr_fifo; JFConversion *conversion; std::vector module_info; public: - ProcessRawPacket(ThreadSafeFIFO &c, ThreadSafeFIFO &wr, uint32_t nmodules, - JFConversion *conversion = nullptr); - ~ProcessRawPacket(); - bool ProcessPacket(jf_payload &datagram, uint32_t src_ip); + ProcessJFPacket(ThreadSafeFIFO &c, ThreadSafeFIFO &wr, uint32_t nmodules, + JFConversion *conversion = nullptr); + ~ProcessJFPacket(); + bool ProcessPacket(jf_payload *datagram, uint32_t src_ip); }; -#endif //JUNGFRAUJOCH_PROCESSRAWPACKET_H +#endif //JUNGFRAUJOCH_PROCESSJFPACKET_H diff --git a/receiver/host/RawEthDevice.h b/receiver/host/RawEthDevice.h index 60658834..1ec8990a 100644 --- a/receiver/host/RawEthDevice.h +++ b/receiver/host/RawEthDevice.h @@ -6,7 +6,7 @@ #include "AcquisitionDevice.h" #include "IBWrappers.h" -#include "ProcessRawPacket.h" +#include "ProcessJFPacket.h" class RawEthDevice : public AcquisitionDevice { IBContext context; diff --git a/receiver/host/UdpReceiver.cpp b/receiver/host/UdpReceiver.cpp new file mode 100644 index 00000000..6f47e678 --- /dev/null +++ b/receiver/host/UdpReceiver.cpp @@ -0,0 +1,66 @@ +// Copyright (2019-2022) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "UdpReceiver.h" + +#include +#include + +#include "../common/JFJochException.h" + +UdpReceiver::UdpReceiver(uint16_t udp_port_number, ProcessJFPacket &process) { + receiver = std::async(std::launch::async, &UdpReceiver::Run, this, udp_port_number, &process); +} + +UdpReceiver::~UdpReceiver() { + cancel = true; + if (receiver.valid()) + receiver.get(); +} + +uint64_t UdpReceiver::Finalize() { + cancel = true; + if (receiver.valid()) + return receiver.get(); + else + return 0; +} + +uint64_t UdpReceiver::Run(uint16_t udp_port_number, ProcessJFPacket *process) { + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) + throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot create UDP socket"); + + timeval timeout{ + .tv_sec = 0, + .tv_usec = 10 + }; + + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) <= 0) + throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot set socket timeout"); + + sockaddr_in server_addr { + .sin_family = AF_INET, + .sin_port = htons(udp_port_number), + .sin_addr = { + .s_addr = INADDR_ANY + } + }; + + if (bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) <= 0) + throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot bind to UDP port"); + + char buffer[9000]; + sockaddr_storage src_addr; + socklen_t src_addr_len=sizeof(src_addr); + + while (!cancel) { + ssize_t count = recvfrom(fd, buffer, sizeof(buffer), 0, (struct sockaddr *) &src_addr, &src_addr_len); + if (count == sizeof(jf_payload)) + process->ProcessPacket((jf_payload *) &buffer, src_addr); + else if ((errno != EAGAIN) && (count == -1)) + throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot bind to UDP port"); + } + + close(fd); +} \ No newline at end of file diff --git a/receiver/host/UdpReceiver.h b/receiver/host/UdpReceiver.h new file mode 100644 index 00000000..e3f67230 --- /dev/null +++ b/receiver/host/UdpReceiver.h @@ -0,0 +1,21 @@ +// Copyright (2019-2022) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef JUNGFRAUJOCH_UDPRECEIVER_H +#define JUNGFRAUJOCH_UDPRECEIVER_H + +#include +#include "ProcessJFPacket.h" + +class UdpReceiver { + std::future receiver; + bool cancel = false; + uint64_t Run(uint16_t udp_port_number, ProcessJFPacket *process); +public: + UdpReceiver(uint16_t udp_port_number, ProcessJFPacket &process); + ~UdpReceiver(); + uint64_t Finalize(); +}; + + +#endif //JUNGFRAUJOCH_UDPRECEIVER_H diff --git a/tests/ProcessRawPacketTest.cpp b/tests/ProcessRawPacketTest.cpp index deff137c..2ab2d152 100644 --- a/tests/ProcessRawPacketTest.cpp +++ b/tests/ProcessRawPacketTest.cpp @@ -3,7 +3,7 @@ #include -#include "../receiver/host/ProcessRawPacket.h" +#include "../receiver/host/ProcessJFPacket.h" #include "../receiver/host/RawJFUDPPacket.h" #include "../common/DiffractionExperiment.h" @@ -11,7 +11,7 @@ TEST_CASE("ProcessRawPacketTest_Empty") { ThreadSafeFIFO c_fifo; ThreadSafeFIFO wr_fifo; { - ProcessRawPacket process(c_fifo, wr_fifo, 2); + ProcessJFPacket process(c_fifo, wr_fifo, 2); } REQUIRE(c_fifo.Size() == 0); } @@ -29,7 +29,7 @@ TEST_CASE("ProcessRawPacketTest") { DiffractionExperiment experiment(2, {4,4}); { - ProcessRawPacket process(c_fifo, wr_fifo, 4); + ProcessJFPacket process(c_fifo, wr_fifo, 4); jf_payload datagram;