diff --git a/CMakeLists.txt b/CMakeLists.txt index aed6e98..44e8243 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,5 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("sf-buffer") -add_subdirectory("sf-replay") add_subdirectory("sf-stream") add_subdirectory("sf-writer") \ No newline at end of file diff --git a/README.md b/README.md index 5811367..50b4124 100644 --- a/README.md +++ b/README.md @@ -124,4 +124,6 @@ In order to unify the way we write code and talk about concept the following terminology definitions should be followed: - frame (data from a single module) -- image (data of the assembled image) \ No newline at end of file +- image (data of the assembled image) +- start_pulse_id and stop_pulse_id (not end_pulse_id) is used to determine the +inclusive range (both start and stop pulse_id are included) of pulses. \ No newline at end of file diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 6eb9f45..57c1344 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -19,7 +19,7 @@ namespace core_buffer { // Must be power of 10 and >= than FILE_MOD. const size_t FOLDER_MOD = 100000; // Extension of our file format. - const std::string FILE_EXTENSION = ".h5"; + const std::string FILE_EXTENSION = ".bin"; // Number of pulses between each statistics print out. const size_t STATS_MODULO = 100; // If the RB is empty, how much time to wait before trying to read it again. @@ -29,9 +29,9 @@ namespace core_buffer { const size_t BUFFER_UDP_N_RECV_MSG = 64; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; - // +1 for packet headers. + // 8246 bytes for each UDP packet. const int BUFFER_UDP_RCVBUF_BYTES = - (128 * 8246 * BUFFER_UDP_RCVBUF_N_SLOTS); + (128 * BUFFER_UDP_RCVBUF_N_SLOTS * 8246); // Microseconds timeout for UDP recv. const int BUFFER_UDP_US_TIMEOUT = 2 * 1000; // HWM for live stream from buffer. @@ -53,7 +53,7 @@ namespace core_buffer { // Address where Replay streams and writer receives. const std::string REPLAY_STREAM_IPC_URL = "ipc:///tmp/sf-replay-"; // How many frames to read at once from file. - const size_t REPLAY_READ_BUFFER_SIZE = 100; + const size_t BUFFER_BLOCK_SIZE = 100; // How many slots to have in the internal queue between read and send. const int REPLAY_FASTQUEUE_N_SLOTS = 2; // How many frames do we buffer in send. @@ -63,8 +63,8 @@ namespace core_buffer { const int WRITER_RCVHWM = 100; // ZMQ threads for receiving data from sf_replay. const int WRITER_ZMQ_IO_THREADS = 2; - // Size of buffer between the receiving and writing part. - const int WRITER_FASTQUEUE_N_SLOTS = 5; + // MS to retry reading from the image assembler. + const size_t WRITER_IMAGE_ASSEMBLER_RETRY_MS = 5; // How large are metadata chunks in the HDF5. const size_t WRITER_METADATA_CHUNK_N_IMAGES = 100; // How large should the data cache be in N images. diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index e1083ff..8dec7c9 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -4,22 +4,36 @@ #include "buffer_config.hpp" #include "jungfrau.hpp" -struct ImageMetadataBuffer +struct ImageMetadataBlock { - uint64_t pulse_id[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint64_t frame_index[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint32_t daq_rec[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint8_t is_good_image[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint16_t n_images; + uint64_t pulse_id[core_buffer::BUFFER_BLOCK_SIZE]; + uint64_t frame_index[core_buffer::BUFFER_BLOCK_SIZE]; + uint32_t daq_rec[core_buffer::BUFFER_BLOCK_SIZE]; + uint8_t is_good_image[core_buffer::BUFFER_BLOCK_SIZE]; + uint64_t block_start_pulse_id; + uint64_t block_stop_pulse_id; }; +const char BUFFER_FORMAT_START_BYTE = 0xBE; + #pragma pack(push) #pragma pack(1) -struct ReplayBuffer +struct BufferBinaryFormat { + + BufferBinaryFormat() : FORMAT_MARKER(BUFFER_FORMAT_START_BYTE) {}; + + const char FORMAT_MARKER; + ModuleFrame metadata; + char data[core_buffer::MODULE_N_BYTES]; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryBlock { - ModuleFrame metadata[core_buffer::REPLAY_READ_BUFFER_SIZE]; + BufferBinaryFormat frame[core_buffer::BUFFER_BLOCK_SIZE]; uint64_t start_pulse_id; - uint16_t n_frames; }; #pragma pack(pop) diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/FastQueue.cpp index 1595278..da679e4 100644 --- a/core-buffer/src/FastQueue.cpp +++ b/core-buffer/src/FastQueue.cpp @@ -103,7 +103,7 @@ void FastQueue::release() read_slot_id_ %= n_slots_; } -template class FastQueue; -template class FastQueue; +template class FastQueue; +template class FastQueue; template class FastQueue; template class FastQueue; diff --git a/core-buffer/test/test_FastQueue.cpp b/core-buffer/test/test_FastQueue.cpp index 0d31142..e720c21 100644 --- a/core-buffer/test/test_FastQueue.cpp +++ b/core-buffer/test/test_FastQueue.cpp @@ -145,4 +145,6 @@ TEST(FaseQueue, array_parameter) ASSERT_EQ(module_metadata.n_received_packets, i_module); ASSERT_EQ(module_metadata.module_id, i_module); } -} \ No newline at end of file +} + +// TODO: Test with payload of zero (metadata only). \ No newline at end of file diff --git a/scripts/JF01-buffer-worker.sh b/scripts/JF01-buffer-worker.sh new file mode 100644 index 0000000..b907af5 --- /dev/null +++ b/scripts/JF01-buffer-worker.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF01-buffer-worker@{00..02} + exit +fi + +M=$1 + +# Add ourselves to the user cpuset. +# echo $$ > /sys/fs/cgroup/cpuset/user/tasks + +coreAssociatedBuffer=(12 12 12) + +initialUDPport=50010 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF01T03V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF01-buffer-worker@.service b/scripts/JF01-buffer-worker@.service new file mode 100644 index 0000000..b8b6a97 --- /dev/null +++ b/scripts/JF01-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF01 UDP2buffer worker instance as a service, instance %i +Requires=JF01-buffer.service +Before=JF01-buffer.service +BindsTo=JF01-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=10 + +[Install] +WantedBy=JF01-buffer.service diff --git a/scripts/JF07-replay.service b/scripts/JF01-buffer.service similarity index 70% rename from scripts/JF07-replay.service rename to scripts/JF01-buffer.service index 288b922..efdc14d 100644 --- a/scripts/JF07-replay.service +++ b/scripts/JF01-buffer.service @@ -1,9 +1,9 @@ [Unit] -Description=All replay instances of JF07 +Description=All UDP-buffer instances of JF01 [Service] Type=oneshot -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-replay-worker.sh +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh RemainAfterExit=yes [Install] diff --git a/scripts/JF01-stream.service b/scripts/JF01-stream.service new file mode 100644 index 0000000..79e7b1b --- /dev/null +++ b/scripts/JF01-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF01 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=10 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF01-stream.sh b/scripts/JF01-stream.sh new file mode 100644 index 0000000..c7d1724 --- /dev/null +++ b/scripts/JF01-stream.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +coreAssociated="24" + +taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF01.json diff --git a/scripts/JF07-buffer-worker.sh b/scripts/JF07-buffer-worker.sh index e9302fc..5d744e1 100644 --- a/scripts/JF07-buffer-worker.sh +++ b/scripts/JF07-buffer-worker.sh @@ -21,11 +21,7 @@ coreAssociatedBuffer=(4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9 10 10 10 1 #coreAssociatedBuffer=(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32) initialUDPport=50100 -# strange that it doesn't work to add 08 or 09 (value too great for base (error token is "09")) port=$((${initialUDPport}+10#${M})) -#port=`expr ${initialUDPport} + ${M}` DETECTOR=JF07T32V01 -echo ${port} -#taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} > /gpfs/photonics/swissfel/buffer/${port}.log -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh deleted file mode 100644 index bb51829..0000000 --- a/scripts/JF07-replay-worker.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash - -if [ $# != 1 ] -then - systemctl start JF07-replay-worker@{00..32} - exit -fi - -M=$1 - -#8 replay workers per core, last (stream to visualisation) worker occupies 4 -coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27) - -latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` -last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` -first_pulse_id=$((${last_pulse_id}-360000)) - -echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" - -if [ ${M} == 32 ] -then -# taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id} - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 10 tcp://192.168.30.29:9107 30 -else - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id} -fi diff --git a/scripts/JF07-replay-worker@.service b/scripts/JF07-replay-worker@.service deleted file mode 100644 index 42e8dda..0000000 --- a/scripts/JF07-replay-worker@.service +++ /dev/null @@ -1,16 +0,0 @@ -[Unit] -Description=JF07 replay worker instance as a service, instance %i -Requires=JF07-replay.service -Before=JF07-replay.service -BindsTo=JF07-replay.service - -[Service] -PermissionsStartOnly=true -Type=idle -User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-replay-worker.sh %i -TimeoutStartSec=10 -RestartSec=10 - -[Install] -WantedBy=JF07-replay.service diff --git a/scripts/JF07-stream.sh b/scripts/JF07-stream.sh index 6cbabc1..bcaf1d4 100644 --- a/scripts/JF07-stream.sh +++ b/scripts/JF07-stream.sh @@ -2,4 +2,4 @@ coreAssociated="20,21,22,23" -taskset -c ${coreAssociated} /usr/bin/sf_stream tcp://129.129.241.42:9007 25 tcp://192.168.30.29:9107 10 +taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF07.json diff --git a/scripts/JF13-buffer-worker.sh b/scripts/JF13-buffer-worker.sh new file mode 100644 index 0000000..c269179 --- /dev/null +++ b/scripts/JF13-buffer-worker.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF13-buffer-worker@00 + exit +fi + +M=$1 + +# Add ourselves to the user cpuset. +# echo $$ > /sys/fs/cgroup/cpuset/user/tasks + +coreAssociatedBuffer=(13) + +initialUDPport=50190 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF13T01V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF13-buffer-worker@.service b/scripts/JF13-buffer-worker@.service new file mode 100644 index 0000000..0ccd3d5 --- /dev/null +++ b/scripts/JF13-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF13 UDP2buffer worker instance as a service, instance %i +Requires=JF13-buffer.service +Before=JF13-buffer.service +BindsTo=JF13-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=10 + +[Install] +WantedBy=JF13-buffer.service diff --git a/scripts/JF13-buffer.service b/scripts/JF13-buffer.service new file mode 100644 index 0000000..2c435ce --- /dev/null +++ b/scripts/JF13-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF13 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF13-stream.service b/scripts/JF13-stream.service new file mode 100644 index 0000000..7e0a492 --- /dev/null +++ b/scripts/JF13-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF13 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=10 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF13-stream.sh b/scripts/JF13-stream.sh new file mode 100644 index 0000000..3b8e226 --- /dev/null +++ b/scripts/JF13-stream.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +coreAssociated="25" + +taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF13.json diff --git a/scripts/delete_old_files_in_buffer.sh b/scripts/delete_old_files_in_buffer.sh new file mode 100755 index 0000000..1d0db5a --- /dev/null +++ b/scripts/delete_old_files_in_buffer.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +hours=5 +threshold=80 + +if [ $# = 1 ] +then + threshold=$1 +fi +if [ $# = 2 ] +then + hours=$2 +fi + +df -h | grep BUFFER > /dev/null +if [ $? != 0 ] +then + # BUFFER is not present + exit +fi + +occupancy=`df -h /gpfs/photonics/swissfel/buffer | grep BUFFER | awk '{print $5}' | sed 's/%//'` +if [ ${occupancy} -lt ${threshold} ] +then +# echo OK, not action + exit +fi + +#find /gpfs/photonics/swissfel/buffer/JF* -type f -mmin +$((${hours}*60)) -delete +find /gpfs/photonics/swissfel/buffer/JF*/M* -type d -mmin +$((${hours}*60)) -delete diff --git a/scripts/start_detector.sh b/scripts/start_detector.sh new file mode 100755 index 0000000..482d7d4 --- /dev/null +++ b/scripts/start_detector.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +if [ $# -lt 1 ] +then + echo "Usage : $0 DETECTOR_NAME " + echo " DETECTOR_NAME: JF07 or JF01..." + echo " number_of_cycles : optional, default 100" + exit +fi + +DETECTOR=$1 +case ${DETECTOR} in +'JF01') + D=1 + ;; +'JF07') + D=7 + ;; +'JF13') + D=13 + ;; +*) + echo "Unsupported detector" + exit + ;; +esac + +n_cycles=100 +if [ $# == 2 ] +then + n_cycles=$2 +fi + +export PATH=/home/dbe/miniconda3/bin:$PATH +source deactivate +source activate dia + +sls_detector_put ${D}-timing trigger +sls_detector_put ${D}-cycles ${n_cycles} +sls_detector_put ${D}-exptime 5e-06 +sls_detector_put ${D}-frames 1 +sls_detector_put ${D}-dr 16 +#sls_detector_put ${D}-clearbit to 0x5d 0 # normal mode, not highG0 +sls_detector_put ${D}-status start + +echo "Now start trigger" diff --git a/sf-buffer/CMakeLists.txt b/sf-buffer/CMakeLists.txt index c4c74ed..a47f0b7 100644 --- a/sf-buffer/CMakeLists.txt +++ b/sf-buffer/CMakeLists.txt @@ -17,4 +17,4 @@ target_link_libraries(sf-buffer hdf5_cpp) enable_testing() -add_subdirectory(test/) \ No newline at end of file +add_subdirectory(test/) diff --git a/sf-buffer/include/BufferBinaryFormat.hpp b/sf-buffer/include/BufferBinaryFormat.hpp deleted file mode 100644 index d9f72cd..0000000 --- a/sf-buffer/include/BufferBinaryFormat.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef JFFILEFORMAT_HPP -#define JFFILEFORMAT_HPP - -#include "jungfrau.hpp" - -const char JF_FORMAT_START_BYTE = 0xBE; - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - - BufferBinaryFormat() : FORMAT_MARKER(JF_FORMAT_START_BYTE) {}; - - const char FORMAT_MARKER; - uint64_t pulse_id; - uint64_t frame_id; - uint32_t daq_rec; - uint16_t n_recv_packets; - char data[JUNGFRAU_DATA_BYTES_PER_FRAME]; -}; -#pragma pack(pop) - -#endif // JFFILEFORMAT_HPP \ No newline at end of file diff --git a/sf-buffer/include/BufferBinaryWriter.hpp b/sf-buffer/include/BufferBinaryWriter.hpp index ee90afb..e401236 100644 --- a/sf-buffer/include/BufferBinaryWriter.hpp +++ b/sf-buffer/include/BufferBinaryWriter.hpp @@ -2,12 +2,13 @@ #define BINARYWRITER_HPP #include -#include "BufferBinaryFormat.hpp" + +#include "formats.hpp" class BufferBinaryWriter { - const std::string device_name_; const std::string root_folder_; + const std::string device_name_; std::string latest_filename_; std::string current_output_filename_; @@ -19,8 +20,8 @@ class BufferBinaryWriter { public: BufferBinaryWriter( - const std::string& device_name, - const std::string& root_folder); + const std::string& root_folder, + const std::string& device_name); virtual ~BufferBinaryWriter(); diff --git a/sf-buffer/include/BufferH5Writer.hpp b/sf-buffer/include/BufferH5Writer.hpp index a4f61b2..922d4df 100644 --- a/sf-buffer/include/BufferH5Writer.hpp +++ b/sf-buffer/include/BufferH5Writer.hpp @@ -6,9 +6,10 @@ #include #include #include -#include "jungfrau.hpp" #include -#include + +#include "jungfrau.hpp" +#include "buffer_config.hpp" class BufferH5Writer { diff --git a/sf-buffer/src/BufferBinaryWriter.cpp b/sf-buffer/src/BufferBinaryWriter.cpp index 750ef76..6e189a6 100644 --- a/sf-buffer/src/BufferBinaryWriter.cpp +++ b/sf-buffer/src/BufferBinaryWriter.cpp @@ -1,21 +1,23 @@ #include "BufferBinaryWriter.hpp" + #include #include #include "date.h" #include #include #include -#include #include -#include + +#include "BufferUtils.hpp" +#include "WriterUtils.hpp" using namespace std; BufferBinaryWriter::BufferBinaryWriter( - const string& device_name, - const string& root_folder) : - device_name_(device_name), + const string& root_folder, + const string& device_name): root_folder_(root_folder), + device_name_(device_name), latest_filename_(root_folder + "/" + device_name + "/LATEST"), current_output_filename_(""), output_file_fd_(-1) @@ -27,7 +29,9 @@ BufferBinaryWriter::~BufferBinaryWriter() close_current_file(); } -void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buffer) +void BufferBinaryWriter::write( + uint64_t pulse_id, + const BufferBinaryFormat* buffer) { auto current_frame_file = BufferUtils::get_filename(root_folder_, device_name_, pulse_id); @@ -37,7 +41,8 @@ void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buff } size_t n_bytes_offset = - BufferUtils::get_file_frame_index(pulse_id) * sizeof(BufferBinaryFormat); + BufferUtils::get_file_frame_index(pulse_id) * + sizeof(BufferBinaryFormat); auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); if (lseek_result < 0) { @@ -118,7 +123,7 @@ void BufferBinaryWriter::close_current_file() BufferUtils::update_latest_file( latest_filename_, current_output_filename_); - } - current_output_filename_ = ""; + current_output_filename_ = ""; + } } \ No newline at end of file diff --git a/sf-buffer/src/BufferH5Writer.cpp b/sf-buffer/src/BufferH5Writer.cpp index ed7ce39..c2706ec 100644 --- a/sf-buffer/src/BufferH5Writer.cpp +++ b/sf-buffer/src/BufferH5Writer.cpp @@ -84,7 +84,8 @@ void BufferH5Writer::set_pulse_id(const uint64_t pulse_id) if (h5_file_.getId() != -1) { auto latest_filename = output_filename_; close_file(); - BufferUtils::update_latest_file(LATEST_filename_, latest_filename); + BufferUtils::update_latest_file( + LATEST_filename_, latest_filename); } WriterUtils::create_destination_folder(new_output_filename); diff --git a/sf-buffer/src/WriterUtils.cpp b/sf-buffer/src/WriterUtils.cpp index 7653bee..9b8d51a 100644 --- a/sf-buffer/src/WriterUtils.cpp +++ b/sf-buffer/src/WriterUtils.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "WriterUtils.hpp" #include "date.h" @@ -41,8 +42,10 @@ void WriterUtils::create_destination_folder(const string& output_file) auto file_separator_index = output_file.rfind('/'); if (file_separator_index != string::npos) { + string output_folder(output_file.substr(0, file_separator_index)); + // TODO: filesystem::create_directories(output_folder) string create_folder_command("mkdir -p " + output_folder); system(create_folder_command.c_str()); } diff --git a/sf-buffer/src/main.cpp b/sf-buffer/src/main.cpp index 22965ed..a3bf997 100644 --- a/sf-buffer/src/main.cpp +++ b/sf-buffer/src/main.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -11,16 +11,18 @@ #include #include #include +#include "formats.hpp" using namespace std; using namespace core_buffer; int main (int argc, char *argv[]) { - if (argc != 5) { + if (argc != 6) { cout << endl; - cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; + cout << "Usage: sf_buffer [detector_name] [device_name] [udp_port] [root_folder]"; cout << "[source_id]"; cout << endl; + cout << "\tdetector_name: Detector name, example JF07T32V01" << endl; cout << "\tdevice_name: Name to write to disk."; cout << "\tudp_port: UDP port to connect to." << endl; cout << "\troot_folder: FS root folder." << endl; @@ -30,13 +32,15 @@ int main (int argc, char *argv[]) { exit(-1); } - string device_name = string(argv[1]); - int udp_port = atoi(argv[2]); - string root_folder = string(argv[3]); - int source_id = atoi(argv[4]); + string detector_name = string(argv[1]); + string device_name = string(argv[2]); + int udp_port = atoi(argv[3]); + string root_folder = string(argv[4]); + int source_id = atoi(argv[5]); stringstream ipc_stream; - ipc_stream << BUFFER_LIVE_IPC_URL << source_id; + string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL + detector_name + "-"; + ipc_stream << LIVE_IPC_URL << source_id; const auto ipc_address = ipc_stream.str(); auto ctx = zmq_ctx_new(); @@ -59,7 +63,7 @@ int main (int argc, char *argv[]) { uint64_t n_corrupted_frames = 0; uint64_t last_pulse_id = 0; - BufferH5Writer writer(root_folder, device_name); + BufferBinaryWriter writer(root_folder, device_name); BufferUdpReceiver receiver(udp_port, source_id); pid_t tid; @@ -67,8 +71,7 @@ int main (int argc, char *argv[]) { int ret = setpriority(PRIO_PROCESS, tid, 0); if (ret == -1) throw runtime_error("cannot set nice"); - ModuleFrame metadata; - auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES]; + BufferBinaryFormat* binary_buffer = new BufferBinaryFormat(); size_t write_total_us = 0; size_t write_max_us = 0; @@ -77,12 +80,12 @@ int main (int argc, char *argv[]) { while (true) { - auto pulse_id = receiver.get_frame_from_udp(metadata, frame_buffer); + auto pulse_id = receiver.get_frame_from_udp( + binary_buffer->metadata, binary_buffer->data); auto start_time = chrono::steady_clock::now(); - writer.set_pulse_id(pulse_id); - writer.write(&metadata, frame_buffer); + writer.write(pulse_id, binary_buffer); auto write_end_time = chrono::steady_clock::now(); auto write_us_duration = chrono::duration_cast( @@ -90,8 +93,9 @@ int main (int argc, char *argv[]) { start_time = chrono::steady_clock::now(); - zmq_send(socket, &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - zmq_send(socket, frame_buffer, MODULE_N_BYTES, 0); + zmq_send(socket, &(binary_buffer->metadata), sizeof(ModuleFrame), + ZMQ_SNDMORE); + zmq_send(socket, binary_buffer->data, MODULE_N_BYTES, 0); auto send_end_time = chrono::steady_clock::now(); auto send_us_duration = chrono::duration_cast( @@ -110,9 +114,9 @@ int main (int argc, char *argv[]) { send_max_us = send_us_duration; } - if (metadata.n_received_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets += - JF_N_PACKETS_PER_FRAME - metadata.n_received_packets; + if (binary_buffer->metadata.n_received_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets += JF_N_PACKETS_PER_FRAME - + binary_buffer->metadata.n_received_packets; n_corrupted_frames++; } @@ -146,5 +150,5 @@ int main (int argc, char *argv[]) { } } - delete[] frame_buffer; + delete binary_buffer; } diff --git a/sf-replay/CMakeLists.txt b/sf-replay/CMakeLists.txt deleted file mode 100644 index 51b15c6..0000000 --- a/sf-replay/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(sf-replay-lib STATIC ${SOURCES}) -target_include_directories(sf-replay-lib PUBLIC include/) -target_link_libraries(sf-replay-lib - external - core-buffer-lib) - -add_executable(sf-replay src/main.cpp) -set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) -target_link_libraries(sf-replay - core-buffer-lib - sf-replay-lib - zmq - hdf5 - hdf5_cpp - pthread) - -enable_testing() -add_subdirectory(test/) \ No newline at end of file diff --git a/sf-replay/include/ReplayH5Reader.hpp b/sf-replay/include/ReplayH5Reader.hpp deleted file mode 100644 index 009a028..0000000 --- a/sf-replay/include/ReplayH5Reader.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP -#define SF_DAQ_BUFFER_REPLAYH5READER_HPP - -#include -#include -#include - -#include "formats.hpp" - -class ReplayH5Reader { - - const std::string device_; - const std::string channel_name_; - - H5::H5File current_file_; - std::string current_filename_; - H5::DataSet dset_metadata_; - H5::DataSet dset_frame_; - -public: - ReplayH5Reader( - const std::string device, - const std::string channel_name); - virtual ~ReplayH5Reader(); - - void close_file(); - void get_buffer( - const uint64_t pulse_id, - ReplayBuffer* metadata, - char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP diff --git a/sf-replay/include/ReplayZmqSender.hpp b/sf-replay/include/ReplayZmqSender.hpp deleted file mode 100644 index 845c9f0..0000000 --- a/sf-replay/include/ReplayZmqSender.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP -#define SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP - -#include -#include -#include - -class ReplayZmqSender { - - void* ctx_; - void* socket_; - -public: - ReplayZmqSender(const std::string& ipc_id, const int source_id); - virtual ~ReplayZmqSender(); - - void close(); - - void send(const ReplayBuffer* metadata, const char* data); -}; - - -#endif //SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP diff --git a/sf-replay/src/ReplayH5Reader.cpp b/sf-replay/src/ReplayH5Reader.cpp deleted file mode 100644 index 9a126ab..0000000 --- a/sf-replay/src/ReplayH5Reader.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include "ReplayH5Reader.hpp" - -#include "BufferUtils.hpp" - -using namespace std; -using namespace core_buffer; - -ReplayH5Reader::ReplayH5Reader( - const string device, - const string channel_name) : - device_(device), - channel_name_(channel_name) -{ -} - -ReplayH5Reader::~ReplayH5Reader() -{ - close_file(); -} - -void ReplayH5Reader::close_file() -{ - if (current_file_.getId() != -1) { - dset_metadata_.close(); - dset_frame_.close(); - current_file_.close(); - } -} - -void ReplayH5Reader::get_buffer( - const uint64_t pulse_id, - ReplayBuffer* metadata, - char* data) -{ - auto pulse_filename = BufferUtils::get_filename( - device_, channel_name_, pulse_id); - - if (pulse_filename != current_filename_) { - close_file(); - - current_filename_ = pulse_filename; - current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY); - - dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); - dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); - } - - auto file_index = BufferUtils::get_file_frame_index(pulse_id); - auto cache_start_index = file_index / REPLAY_READ_BUFFER_SIZE; - cache_start_index *= REPLAY_READ_BUFFER_SIZE; - - uint64_t b_start_pulse_id = pulse_id - (file_index - cache_start_index); - metadata->start_pulse_id = b_start_pulse_id; - metadata->n_frames = REPLAY_READ_BUFFER_SIZE; - - hsize_t b_m_dims[2] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; - H5::DataSpace b_m_space (2, b_m_dims); - hsize_t b_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; - hsize_t b_m_start[] = {cache_start_index, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace f_m_space (2, f_m_dims); - hsize_t f_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; - hsize_t pulse_id_start[] = {cache_start_index, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start); - - dset_metadata_.read( - &metadata->metadata[0], - H5::PredType::NATIVE_UINT64, b_m_space, f_m_space); - - hsize_t b_f_dims[3] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace b_f_space (3, b_f_dims); - hsize_t b_f_count[] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t b_f_start[] = {0, 0, 0}; - b_f_space.selectHyperslab(H5S_SELECT_SET, b_f_count, b_f_start); - - hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace f_f_space (3, f_frame_dims); - hsize_t f_f_count[] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t f_f_start[] = {cache_start_index, 0, 0}; - f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); - - dset_frame_.read( - data, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space); -} diff --git a/sf-replay/src/ReplayZmqSender.cpp b/sf-replay/src/ReplayZmqSender.cpp deleted file mode 100644 index 3d982ae..0000000 --- a/sf-replay/src/ReplayZmqSender.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include "ReplayZmqSender.hpp" - -#include -#include - -#include "buffer_config.hpp" - -using namespace std; -using namespace core_buffer; - - -ReplayZmqSender::ReplayZmqSender(const string& ipc_id, const int source_id) -{ - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - stringstream ipc_stream; - ipc_stream << ipc_base << source_id; - const auto ipc_address = ipc_stream.str(); - - ctx_ = zmq_ctx_new(); - socket_ = zmq_socket(ctx_, ZMQ_PUSH); - - const int sndhwm = REPLAY_SNDHWM; - if (zmq_setsockopt(socket_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(zmq_strerror (errno)); - - const int linger_ms = -1; - if (zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(zmq_strerror (errno)); - - if (zmq_bind(socket_, ipc_address.c_str()) != 0) - throw runtime_error(zmq_strerror (errno)); -} - -ReplayZmqSender::~ReplayZmqSender() -{ - close(); -} - -void ReplayZmqSender::close() { - zmq_close(socket_); - zmq_ctx_destroy(ctx_); -} - -void ReplayZmqSender::send(const ReplayBuffer* metadata, const char* data) -{ - zmq_send(socket_, metadata, sizeof(ReplayBuffer), ZMQ_SNDMORE); - zmq_send(socket_, data, MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, 0); -} \ No newline at end of file diff --git a/sf-replay/src/main.cpp b/sf-replay/src/main.cpp deleted file mode 100644 index e2cddc8..0000000 --- a/sf-replay/src/main.cpp +++ /dev/null @@ -1,135 +0,0 @@ -#include -#include -#include -#include - -#include "buffer_config.hpp" -#include "ReplayH5Reader.hpp" -#include "ReplayZmqSender.hpp" - -using namespace std; -using namespace core_buffer; -using namespace chrono; - -void sf_replay ( - const string device, - const string channel_name, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id - ) -{ - ReplayH5Reader file_reader(device, channel_name); - - // "<= stop_pulse_id" because we include the stop_pulse_id in the file. - for (uint64_t curr_pulse_id=start_pulse_id; - curr_pulse_id <= stop_pulse_id; - curr_pulse_id++) { - - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto start_time = steady_clock::now(); - - auto metadata = queue.get_metadata_buffer(slot_id); - auto buffer = queue.get_data_buffer(slot_id); - - file_reader.get_buffer(curr_pulse_id, metadata, buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = - duration_cast(end_time-start_time).count(); - - queue.commit(); - - // TODO: Proper statistics - cout << "sf_replay:avg_read_us "; - cout << read_us_duration / REPLAY_READ_BUFFER_SIZE << endl; - } -} - -int main (int argc, char *argv[]) { - - if (argc != 7) { - cout << endl; - cout << "Usage: sf_replay [ipc_id] [device]"; - cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\tdevice: Name of detector." << endl; - cout << "\tchannel_name: M00-M31 for JF16M." << endl; - cout << "\tsource_id: Module index" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - const string ipc_id = string(argv[1]); - const string device = string(argv[2]); - const string channel_name = string(argv[3]); - const auto source_id = atoi(argv[4]); - const auto start_pulse_id = (uint64_t) atoll(argv[5]); - const auto stop_pulse_id = (uint64_t) atoll(argv[6]); - - FastQueue queue( - MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, - REPLAY_FASTQUEUE_N_SLOTS); - - thread file_read_thread(sf_replay, - device, channel_name, ref(queue), - start_pulse_id, stop_pulse_id); - - uint64_t send_us = 0; - uint64_t max_send_us = 0; - uint64_t n_stats = 0; - - ReplayZmqSender sender(ipc_id, source_id); - - // "<= stop_pulse_id" because we include the stop_pulse_id in the file. - for (uint64_t curr_pulse_id=start_pulse_id; - curr_pulse_id <= stop_pulse_id; - curr_pulse_id++) { - - int slot_id; - while((slot_id = queue.read()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto m_buffer = queue.get_metadata_buffer(slot_id); - auto f_buffer = queue.get_data_buffer(slot_id); - - auto start_time = steady_clock::now(); - - sender.send(m_buffer, f_buffer); - - auto end_time = steady_clock::now(); - uint64_t send_us_duration = - duration_cast(end_time-start_time).count(); - - queue.release(); - - // TODO: Proper statistics - n_stats++; - - send_us += send_us_duration; - max_send_us = max(max_send_us, send_us_duration); - - if (n_stats == STATS_MODULO) { - cout << "sf_replay:avg_send_us " << send_us / STATS_MODULO; - cout << " sf_replay:max_send_us " << max_send_us; - cout << endl; - - n_stats = 0; - send_us = 0; - max_send_us = 0; - } - } - - file_read_thread.join(); - return 0; -} diff --git a/sf-replay/test/CMakeLists.txt b/sf-replay/test/CMakeLists.txt deleted file mode 100644 index 45588cf..0000000 --- a/sf-replay/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -add_executable(sf-replay-tests main.cpp) - -target_link_libraries(sf-replay-tests - core-buffer-lib - sf-buffer-lib - sf-replay-lib - hdf5 - hdf5_cpp - gtest - ) diff --git a/sf-replay/test/main.cpp b/sf-replay/test/main.cpp deleted file mode 100644 index d40a5df..0000000 --- a/sf-replay/test/main.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "gtest/gtest.h" -#include "test_ReplayH5Reader.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/sf-replay/test/test_ReplayH5Reader.cpp b/sf-replay/test/test_ReplayH5Reader.cpp deleted file mode 100644 index 17be0a0..0000000 --- a/sf-replay/test/test_ReplayH5Reader.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include -#include - -#include "ReplayH5Reader.hpp" -#include "BufferH5Writer.hpp" - -using namespace std; -using namespace core_buffer; - -TEST(ReplayH5Reader, basic_interaction) -{ - auto root_folder = "."; - auto device_name = "fast_device"; - size_t pulse_id = 65; - uint16_t source_id = 124; - - // This 2 must be compatible by design. - BufferH5Writer writer(root_folder, device_name); - ReplayH5Reader reader(root_folder, device_name); - - ModuleFrame w_metadata; - ModuleFrame* r_metadata; - auto w_frame_buffer = make_unique(MODULE_N_PIXELS); - char* r_frame_buffer; - - // Setup test values. - w_metadata.pulse_id = pulse_id; - w_metadata.frame_index = 2; - w_metadata.daq_rec = 3; - w_metadata.n_received_packets = 128; - w_metadata.module_id = source_id; - - for (size_t i=0; ipulse_id, pulse_id); - ASSERT_EQ(r_metadata->module_id, source_id); - ASSERT_EQ(r_metadata->frame_index, 2); - ASSERT_EQ(r_metadata->daq_rec, 3); - ASSERT_EQ(r_metadata->n_received_packets, 128); - - // Data as well. - auto offset = MODULE_N_PIXELS * (pulse_id-1); - for (size_t i=0; ipulse_id, 0); - ASSERT_EQ(r_metadata->frame_index, 0); - ASSERT_EQ(r_metadata->daq_rec, 0); - ASSERT_EQ(r_metadata->n_received_packets, 0); - ASSERT_EQ(r_metadata->module_id, 0); - } - - reader.close_file(); -} diff --git a/sf-stream/CMakeLists.txt b/sf-stream/CMakeLists.txt index 216e2dd..a20ccab 100644 --- a/sf-stream/CMakeLists.txt +++ b/sf-stream/CMakeLists.txt @@ -10,9 +10,11 @@ target_link_libraries(sf-stream-lib add_executable(sf-stream src/main.cpp) set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) target_link_libraries(sf-stream + external sf-stream-lib zmq - pthread) + pthread + jsoncpp) enable_testing() -add_subdirectory(test/) \ No newline at end of file +add_subdirectory(test/) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 2c236ef..d9a4b45 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -10,35 +10,45 @@ #include #include #include -#include "date.h" -#include +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" +#include +#include using namespace std; using namespace core_buffer; int main (int argc, char *argv[]) { - if (argc != 5) { + if (argc != 2) { cout << endl; cout << "Usage: sf_stream "; - cout << " [streamvis_address] [reduction_factor_streamvis]"; - cout << " [live_analysis_address] [reduction_factor_live_analysis]"; + cout << " [config_json_file]"; cout << endl; - cout << "\tstreamvis_address: address to streamvis, example tcp://129.129.241.42:9007" << endl; - cout << "\treduction_factor_streamvis: 1 out of N (example 10) images to send to streamvis. For remaining send metadata." << endl; - cout << "\tlive_analysis_address: address to live_analysis, example tcp://129.129.241.42:9107" << endl; - cout << "\treduction_factor_live_analysis: 1 out of N (example 10) images to send to live analysis. For remaining send metadata. N<=1 - send every image" << endl; + cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl; cout << endl; exit(-1); } - string streamvis_address = string(argv[1]); - int reduction_factor_streamvis = (int) atoll(argv[2]); - string live_analysis_address = string(argv[3]); - int reduction_factor_live_analysis = (uint64_t) atoll(argv[4]); + string config_json_file = string(argv[1]); + + ifstream ifs(config_json_file); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + string streamvis_address = config_parameters["streamvis_stream"].GetString(); + int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt(); + string live_analysis_address = config_parameters["live_stream"].GetString(); + int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt(); + + const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString(); + const string GAIN_FILENAME = config_parameters["gain_file"].GetString(); + const string DETECTOR_NAME = config_parameters["detector_name"].GetString(); + size_t n_modules = config_parameters["n_modules"].GetInt(); - size_t n_modules = 32; FastQueue queue( n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); @@ -46,7 +56,9 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); + const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME+"-"; + + LiveRecvModule recv_module(queue, n_modules, ctx, LIVE_IPC_URL); // 0mq sockets to streamvis and live analysis void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); @@ -59,8 +71,8 @@ int main (int argc, char *argv[]) } uint16_t data_empty [] = { 0, 0, 0, 0}; - Json::Value header; - Json::StreamWriterBuilder builder; + + // TODO: Remove stats trash. int stats_counter = 0; @@ -70,6 +82,10 @@ int main (int argc, char *argv[]) while (true) { + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + auto start_time = chrono::steady_clock::now(); auto slot_id = queue.read(); @@ -112,39 +128,63 @@ int main (int argc, char *argv[]) } } - //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - header["frame"] = (Json::Value::UInt64)frame_index; - header["is_good_frame"] = is_good_frame; - header["daq_rec"] = (Json::Value::UInt64)daq_rec; - header["pulse_id"] = (Json::Value::UInt64)pulse_id; + header.AddMember("frame", frame_index, header_alloc); + header.AddMember("is_good_frame", is_good_frame, header_alloc); + header.AddMember("daq_rec", daq_rec, header_alloc); + header.AddMember("pulse_id", pulse_id, header_alloc); - //this needs to be re-read from external source - header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; - header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; + rapidjson::Value pedestal_file; + pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); + header.AddMember("pedestal_file", pedestal_file, header_alloc); - header["number_frames_expected"] = 10000; - header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000); + rapidjson::Value gain_file; + gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); + header.AddMember("gain_file", gain_file, header_alloc); - // detector name should come as parameter to sf_stream - header["detector_name"] = "JF07T32V01"; + header.AddMember("number_frames_expected", 10000, header_alloc); - header["htype"] = "array-1.0"; - header["type"] = "uint16"; + rapidjson::Value run_name; + run_name.SetString( + to_string(uint64_t(pulse_id/10000)*10000).c_str(), + header_alloc); + header.AddMember("run_name", run_name, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + header.AddMember("type", "uint16", header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); int send_streamvis = 0; if ( reduction_factor_streamvis > 1 ) { send_streamvis = rand() % reduction_factor_streamvis; } if ( send_streamvis == 0 ) { - header["shape"][0] = 16384; - header["shape"][1] = 1024; + auto& shape = header["shape"]; + shape[0] = n_modules*512; + shape[1] = 1024; } else{ - header["shape"][0] = 2; - header["shape"][1] = 2; + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; } - string text_header = Json::writeString(builder, header); + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } zmq_send(socket_streamvis, text_header.c_str(), @@ -169,14 +209,22 @@ int main (int argc, char *argv[]) send_live_analysis = rand() % reduction_factor_live_analysis; } if ( send_live_analysis == 0 ) { - header["shape"][0] = 16384; - header["shape"][1] = 1024; + auto& shape = header["shape"]; + shape[0] = n_modules*512; + shape[1] = 1024; } else{ - header["shape"][0] = 2; - header["shape"][1] = 2; + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; } - text_header = Json::writeString(builder, header); + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } zmq_send(socket_live, text_header.c_str(), diff --git a/sf-writer/CMakeLists.txt b/sf-writer/CMakeLists.txt index a66a5b1..5b520f5 100644 --- a/sf-writer/CMakeLists.txt +++ b/sf-writer/CMakeLists.txt @@ -13,6 +13,7 @@ target_link_libraries(sf-writer sf-writer-lib zmq hdf5 + hdf5_hl hdf5_cpp pthread ) diff --git a/sf-writer/include/BufferBinaryReader.hpp b/sf-writer/include/BufferBinaryReader.hpp new file mode 100644 index 0000000..cca17fc --- /dev/null +++ b/sf-writer/include/BufferBinaryReader.hpp @@ -0,0 +1,28 @@ +#ifndef SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP +#define SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP + + +#include + +class BufferBinaryReader { + + const std::string device_; + const std::string channel_name_; + + std::string current_input_file_; + int input_file_fd_; + + void open_file(const std::string& filename); + void close_current_file(); + +public: + BufferBinaryReader(const std::string &device, + const std::string &channel_name); + + virtual ~BufferBinaryReader(); + + void get_block(const uint64_t block_id, BufferBinaryBlock *buffer); +}; + + +#endif //SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP diff --git a/sf-writer/include/ImageAssembler.hpp b/sf-writer/include/ImageAssembler.hpp new file mode 100644 index 0000000..4aa13c9 --- /dev/null +++ b/sf-writer/include/ImageAssembler.hpp @@ -0,0 +1,41 @@ +#ifndef SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP +#define SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP + +#include + +#include "formats.hpp" + +const size_t IA_N_SLOTS = 2; + +class ImageAssembler { + const size_t n_modules_; + const size_t image_buffer_slot_n_bytes_; + + char* image_buffer_; + ImageMetadataBlock* meta_buffer_; + ModuleFrame* frame_meta_buffer_; + std::atomic_int* buffer_status_; + + size_t get_data_offset(const uint64_t slot_id, const int i_module); + size_t get_metadata_offset(const uint64_t slot_id, const int i_module); + +public: + ImageAssembler(const size_t n_modules); + + virtual ~ImageAssembler(); + + bool is_slot_free(const uint64_t bunch_id); + bool is_slot_full(const uint64_t bunch_id); + + void process(uint64_t bunch_id, + const int i_module, + const BufferBinaryBlock* block_buffer); + + void free_slot(const uint64_t bunch_id); + + ImageMetadataBlock* get_metadata_buffer(const uint64_t bunch_id); + char* get_data_buffer(const uint64_t bunch_id); +}; + + +#endif //SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP diff --git a/sf-writer/include/JFH5Writer.hpp b/sf-writer/include/JFH5Writer.hpp new file mode 100644 index 0000000..7432122 --- /dev/null +++ b/sf-writer/include/JFH5Writer.hpp @@ -0,0 +1,40 @@ +#ifndef SFWRITER_HPP +#define SFWRITER_HPP + +#include +#include +#include +#include "buffer_config.hpp" +#include "formats.hpp" + +class JFH5Writer { + + const uint64_t start_pulse_id_; + const uint64_t stop_pulse_id_; + const size_t n_modules_; + const size_t n_images_; + size_t current_write_index_; + + H5::H5File file_; + + H5::DataSet image_dataset_; + + void close_file(); + +public: + JFH5Writer(const std::string& output_file, + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id, + const size_t n_modules); + ~JFH5Writer(); + void write(const ImageMetadataBlock* metadata, const char* data); + + + uint64_t* b_pulse_id_; + uint64_t* b_frame_index_; + uint32_t* b_daq_rec_; + uint8_t* b_is_good_frame_ ; +}; + + +#endif //SFWRITER_HPP diff --git a/sf-writer/include/WriterH5Writer.hpp b/sf-writer/include/WriterH5Writer.hpp deleted file mode 100644 index d1983d0..0000000 --- a/sf-writer/include/WriterH5Writer.hpp +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include -#include "buffer_config.hpp" -#include "formats.hpp" - -class WriterH5Writer { - - const size_t n_frames_; - const size_t n_modules_; - size_t current_write_index_; - - H5::H5File file_; - - H5::DataSet image_dataset_; - H5::DataSet pulse_id_dataset_; - H5::DataSet frame_index_dataset_; - H5::DataSet daq_rec_dataset_; - H5::DataSet is_good_frame_dataset_; - - -public: - WriterH5Writer(const std::string& output_file, - const size_t n_frames, - const size_t n_modules); - ~WriterH5Writer(); - void write(const ImageMetadataBuffer* metadata, const char* data); - void close_file(); -}; - - -#endif //SFWRITER_HPP diff --git a/sf-writer/include/WriterZmqReceiver.hpp b/sf-writer/include/WriterZmqReceiver.hpp deleted file mode 100644 index b02f9e0..0000000 --- a/sf-writer/include/WriterZmqReceiver.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP -#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP - -#include -#include "WriterH5Writer.hpp" -#include -#include - - -class WriterZmqReceiver { - - const size_t n_modules_; - std::vector sockets_; - const uint64_t stop_pulse_id_; - - ModuleFrame f_meta_; - -public: - WriterZmqReceiver( - void *ctx, - const std::string& ipc_prefix, - const size_t n_modules, - const uint64_t stop_pulse_id); - - virtual ~WriterZmqReceiver(); - - void get_next_buffer( - const uint64_t start_pulse_id, - ImageMetadataBuffer* i_meta, - char* image_buffer); -}; - - -#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP diff --git a/sf-writer/src/BufferBinaryReader.cpp b/sf-writer/src/BufferBinaryReader.cpp new file mode 100644 index 0000000..0c1eeee --- /dev/null +++ b/sf-writer/src/BufferBinaryReader.cpp @@ -0,0 +1,123 @@ +#include "BufferBinaryReader.hpp" + +#include +#include +#include +#include +#include + +#include "BufferUtils.hpp" + +using namespace std; +using namespace core_buffer; + +BufferBinaryReader::BufferBinaryReader( + const std::string &device, + const std::string &channel_name) : + device_(device), + channel_name_(channel_name), + current_input_file_(""), + input_file_fd_(-1) +{ + +} + +BufferBinaryReader::~BufferBinaryReader() +{ + close_current_file(); +} + +void BufferBinaryReader::get_block( + const uint64_t block_id, BufferBinaryBlock* buffer) +{ + uint64_t block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE; + auto current_block_file = BufferUtils::get_filename( + device_, channel_name_, block_start_pulse_id); + + if (current_block_file != current_input_file_) { + open_file(current_block_file); + } + + size_t file_start_index = + BufferUtils::get_file_frame_index(block_start_pulse_id); + size_t n_bytes_offset = file_start_index * sizeof(BufferBinaryFormat); + + auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); + if (lseek_result < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryReader::get_block]"; + err_msg << " Error while lseek on file "; + err_msg << current_input_file_; + err_msg << " for n_bytes_offset "; + err_msg << n_bytes_offset << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + auto n_bytes = ::read(input_file_fd_, buffer, + sizeof(BufferBinaryFormat) * BUFFER_BLOCK_SIZE); + + if (n_bytes < sizeof(BufferBinaryFormat)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryReader::get_block]"; + err_msg << " Error while reading from file "; + err_msg << current_input_file_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } +} + +void BufferBinaryReader::open_file(const std::string& filename) +{ + close_current_file(); + + input_file_fd_ = open(filename.c_str(), O_RDONLY); + + if (input_file_fd_ < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryReader::open_file]"; + err_msg << " Cannot open file "; + err_msg << filename << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + current_input_file_ = filename; +} + +void BufferBinaryReader::close_current_file() +{ + if (input_file_fd_ != -1) { + if (close(input_file_fd_) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::close_current_file]"; + err_msg << " Error while closing file "; + err_msg << current_input_file_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + input_file_fd_ = -1; + current_input_file_ = ""; + } +} diff --git a/sf-writer/src/ImageAssembler.cpp b/sf-writer/src/ImageAssembler.cpp new file mode 100644 index 0000000..73a0820 --- /dev/null +++ b/sf-writer/src/ImageAssembler.cpp @@ -0,0 +1,171 @@ +#include +#include "ImageAssembler.hpp" + +using namespace std; +using namespace core_buffer; + +ImageAssembler::ImageAssembler(const size_t n_modules) : + n_modules_(n_modules), + image_buffer_slot_n_bytes_(BUFFER_BLOCK_SIZE * MODULE_N_BYTES * n_modules_) +{ + image_buffer_ = new char[IA_N_SLOTS * image_buffer_slot_n_bytes_]; + meta_buffer_ = new ImageMetadataBlock[IA_N_SLOTS]; + frame_meta_buffer_ = + new ModuleFrame[IA_N_SLOTS * n_modules * BUFFER_BLOCK_SIZE]; + buffer_status_ = new atomic_int[IA_N_SLOTS]; + + for (size_t i=0; i 0; +} + +bool ImageAssembler::is_slot_full(const uint64_t bunch_id) +{ + auto slot_id = bunch_id % IA_N_SLOTS; + return buffer_status_[slot_id].load() == 0; +} + +size_t ImageAssembler::get_data_offset( + const uint64_t slot_id, const int i_module) +{ + size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; + size_t module_i_offset = i_module * MODULE_N_BYTES; + + return slot_i_offset + module_i_offset; +} + +size_t ImageAssembler::get_metadata_offset( + const uint64_t slot_id, const int i_module) +{ + size_t n_metadata_in_slot = n_modules_ * BUFFER_BLOCK_SIZE; + size_t slot_m_offset = slot_id * n_metadata_in_slot; + size_t module_m_offset = i_module; + + return slot_m_offset + module_m_offset; +} + +void ImageAssembler::process( + const uint64_t bunch_id, + const int i_module, + const BufferBinaryBlock* block_buffer) +{ + const auto slot_id = bunch_id % IA_N_SLOTS; + + auto image_offset = get_data_offset(slot_id, i_module); + const auto image_offset_step = MODULE_N_BYTES * n_modules_; + + auto meta_offset = get_metadata_offset(slot_id, i_module); + const auto meta_offset_step = n_modules_; + + for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { + + memcpy( + &(frame_meta_buffer_[meta_offset]), + &(block_buffer->frame[i_pulse].metadata), + sizeof(ModuleFrame)); + + meta_offset += meta_offset_step; + + memcpy( + image_buffer_ + image_offset, + &(block_buffer->frame[i_pulse].data[0]), + MODULE_N_BYTES); + + image_offset += image_offset_step; + } + + buffer_status_[slot_id].fetch_sub(1); +} + +void ImageAssembler::free_slot(const uint64_t bunch_id) +{ + auto slot_id = bunch_id % IA_N_SLOTS; + buffer_status_[slot_id].store(n_modules_); +} + +ImageMetadataBlock* ImageAssembler::get_metadata_buffer(const uint64_t bunch_id) +{ + const auto slot_id = bunch_id % IA_N_SLOTS; + + auto& image_pulse_id = meta_buffer_[slot_id].pulse_id; + auto& image_frame_index = meta_buffer_[slot_id].frame_index; + auto& image_daq_rec = meta_buffer_[slot_id].daq_rec; + auto& image_is_good_frame = meta_buffer_[slot_id].is_good_image; + + auto meta_offset = get_metadata_offset(slot_id, 0); + const auto meta_offset_step = 1; + + uint64_t start_pulse_id = bunch_id * BUFFER_BLOCK_SIZE; + meta_buffer_[slot_id].block_start_pulse_id = start_pulse_id; + + uint64_t stop_pulse_id = start_pulse_id + BUFFER_BLOCK_SIZE - 1; + meta_buffer_[slot_id].block_stop_pulse_id = stop_pulse_id; + + for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { + + auto is_pulse_init = false; + image_is_good_frame[i_pulse] = 1; + + for (size_t i_module=0; i_module < n_modules_; i_module++) { + + auto& frame_meta = frame_meta_buffer_[meta_offset]; + auto is_good_frame = + frame_meta.n_received_packets == JF_N_PACKETS_PER_FRAME; + + if (!is_good_frame) { + image_is_good_frame[i_pulse] = 0; + // TODO: Update meta_offset only once in the loop. + meta_offset += meta_offset_step; + continue; + } + + if (!is_pulse_init) { + image_pulse_id[i_pulse] = frame_meta.pulse_id; + image_frame_index[i_pulse] = frame_meta.frame_index; + image_daq_rec[i_pulse] = frame_meta.daq_rec; + + is_pulse_init = true; + } + + if (image_is_good_frame[i_pulse] == 1) { + if (image_pulse_id[i_pulse] != frame_meta.pulse_id) { + image_is_good_frame[i_pulse] = 0; + } + + if (image_frame_index[i_pulse] != frame_meta.frame_index) { + image_is_good_frame[i_pulse] = 0; + } + + if (image_daq_rec[i_pulse] != frame_meta.daq_rec) { + image_is_good_frame[i_pulse] = 0; + } + + if (frame_meta.n_received_packets != JF_N_PACKETS_PER_FRAME) { + image_is_good_frame[i_pulse] = 0; + } + } + + meta_offset += meta_offset_step; + } + } + + return &(meta_buffer_[slot_id]); +} + +char* ImageAssembler::get_data_buffer(const uint64_t bunch_id) +{ + auto slot_id = bunch_id % IA_N_SLOTS; + return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); +} diff --git a/sf-writer/src/JFH5Writer.cpp b/sf-writer/src/JFH5Writer.cpp new file mode 100644 index 0000000..17a2110 --- /dev/null +++ b/sf-writer/src/JFH5Writer.cpp @@ -0,0 +1,205 @@ +#include "JFH5Writer.hpp" +#include +#include +#include + + +//extern "C" +//{ +// #include "H5DOpublic.h" +// #include +//} + +using namespace std; +using namespace core_buffer; + +JFH5Writer::JFH5Writer(const std::string& output_file, + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id, + const size_t n_modules) : + start_pulse_id_(start_pulse_id), + stop_pulse_id_(stop_pulse_id), + n_modules_(n_modules), + n_images_(stop_pulse_id - start_pulse_id + 1), + current_write_index_(0) +{ + +// bshuf_register_h5filter(); + + file_ = H5::H5File(output_file, H5F_ACC_TRUNC); + + hsize_t image_dataset_dims[3] = + {n_images_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; + + H5::DataSpace image_dataspace(3, image_dataset_dims); + +// auto chunk_size = min(n_images_, BUFFER_BLOCK_SIZE); +// hsize_t image_dataset_chunking[3] = +// {chunk_size, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; +// H5::DSetCreatPropList image_dataset_properties; +// image_dataset_properties.setChunk(3, image_dataset_chunking); + +// // block_size, compression type +// uint compression_prop[] = +// {MODULE_N_PIXELS, //block size +// BSHUF_H5_COMPRESS_LZ4}; // Compression type +// +// H5Pset_filter(image_dataset_properties.getId(), +// BSHUF_H5FILTER, +// H5Z_FLAG_MANDATORY, +// 2, +// &(compression_prop[0])); + + image_dataset_ = file_.createDataSet( + "image", + H5::PredType::NATIVE_UINT16, + image_dataspace); + + b_pulse_id_ = new uint64_t[n_images_]; + b_frame_index_= new uint64_t[n_images_]; + b_daq_rec_ = new uint32_t[n_images_]; + b_is_good_frame_ = new uint8_t[n_images_]; +} + +JFH5Writer::~JFH5Writer() +{ + close_file(); + + delete[] b_pulse_id_; + delete[] b_frame_index_; + delete[] b_daq_rec_; + delete[] b_is_good_frame_; +} + +void JFH5Writer::close_file() +{ + if (file_.getId() == -1) { + return; + } + + image_dataset_.close(); + + hsize_t b_m_dims[2] = {n_images_, 1}; + H5::DataSpace b_m_space (2, b_m_dims); + + hsize_t f_m_dims[] = {n_images_, 1}; + H5::DataSpace f_m_space(2, f_m_dims); + + auto pulse_id_dataset = file_.createDataSet( + "pulse_id", + H5::PredType::NATIVE_UINT64, + f_m_space); + pulse_id_dataset.write( + b_pulse_id_, H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + pulse_id_dataset.close(); + + auto frame_index_dataset = file_.createDataSet( + "frame_index", + H5::PredType::NATIVE_UINT64, + f_m_space); + frame_index_dataset.write( + b_frame_index_, H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + frame_index_dataset.close(); + + auto daq_rec_dataset = file_.createDataSet( + "daq_rec", + H5::PredType::NATIVE_UINT32, + f_m_space); + daq_rec_dataset.write( + b_daq_rec_, H5::PredType::NATIVE_UINT32, + b_m_space, f_m_space); + daq_rec_dataset.close(); + + auto is_good_frame_dataset = file_.createDataSet( + "is_good_frame", + H5::PredType::NATIVE_UINT8, + f_m_space); + is_good_frame_dataset.write( + b_is_good_frame_, H5::PredType::NATIVE_UINT8, + b_m_space, f_m_space); + is_good_frame_dataset.close(); + + file_.close(); +} + +void JFH5Writer::write( + const ImageMetadataBlock* metadata, const char* data) +{ + size_t n_images_offset = 0; + if (start_pulse_id_ > metadata->block_start_pulse_id) { + n_images_offset = start_pulse_id_ - metadata->block_start_pulse_id; + } + + if (n_images_offset > BUFFER_BLOCK_SIZE) { + throw runtime_error("Received unexpected block for start_pulse_id."); + } + + size_t n_images_to_copy = BUFFER_BLOCK_SIZE - n_images_offset; + if (stop_pulse_id_ < metadata->block_stop_pulse_id) { + n_images_to_copy -= metadata->block_stop_pulse_id - stop_pulse_id_; + } + + if (n_images_to_copy < 1) { + throw runtime_error("Received unexpected block for stop_pulse_id."); + } + + hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + H5::DataSpace b_i_space(3, b_i_dims); + hsize_t b_i_count[] = {n_images_to_copy, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + hsize_t b_i_start[] = {n_images_offset, 0, 0}; + b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); + + hsize_t f_i_dims[3] = {n_images_, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + H5::DataSpace f_i_space(3, f_i_dims); + hsize_t f_i_count[] = {n_images_to_copy, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + hsize_t f_i_start[] = {current_write_index_, 0, 0}; + f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); + + image_dataset_.write( + data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space); + + + // pulse_id + { + auto b_current_ptr = b_pulse_id_ + current_write_index_; + memcpy(b_current_ptr, + &(metadata->pulse_id[n_images_offset]), + sizeof(uint64_t) * n_images_to_copy); + } + + // frame_index + { + auto b_current_ptr = b_frame_index_ + current_write_index_; + memcpy(b_current_ptr, + &(metadata->frame_index[n_images_offset]), + sizeof(uint64_t) * n_images_to_copy); + } + + // daq_rec + { + auto b_current_ptr = b_daq_rec_ + current_write_index_; + memcpy(b_current_ptr, + &(metadata->daq_rec[n_images_offset]), + sizeof(uint32_t) * n_images_to_copy); + } + + // is_good_frame + { + auto b_current_ptr = b_is_good_frame_ + current_write_index_; + memcpy(b_current_ptr, + &(metadata->is_good_image[n_images_offset]), + sizeof(uint8_t) * n_images_to_copy); + } + + current_write_index_ += n_images_to_copy; +} diff --git a/sf-writer/src/WriterH5Writer.cpp b/sf-writer/src/WriterH5Writer.cpp deleted file mode 100644 index 0d921c0..0000000 --- a/sf-writer/src/WriterH5Writer.cpp +++ /dev/null @@ -1,162 +0,0 @@ -#include "WriterH5Writer.hpp" -#include - - -//extern "C" -//{ -// #include "H5DOpublic.h" -// #include -//} - -using namespace std; -using namespace core_buffer; - -WriterH5Writer::WriterH5Writer( - const string& output_file, - const size_t n_frames, - const size_t n_modules) : - n_frames_(n_frames), - n_modules_(n_modules), - current_write_index_(0) -{ - -// bshuf_register_h5filter(); - - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - - hsize_t image_dataset_dims[3] = - {n_frames_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - -// // block_size, compression type -// uint compression_prop[] = -// {MODULE_N_PIXELS, //block size -// BSHUF_H5_COMPRESS_LZ4}; // Compression type -// -// H5Pset_filter(image_dataset_properties.getId(), -// BSHUF_H5FILTER, -// H5Z_FLAG_MANDATORY, -// 2, -// &(compression_prop[0])); - - image_dataset_ = file_.createDataSet( - "image", - H5::PredType::NATIVE_UINT16, - image_dataspace, - image_dataset_properties); - - hsize_t metadata_dataset_dims[] = {n_frames_, 1}; - H5::DataSpace metadata_dataspace(2, metadata_dataset_dims); - - // Chunk cannot be larger than n_frames. - auto metadata_chunk_size = WRITER_METADATA_CHUNK_N_IMAGES; - if (n_frames < metadata_chunk_size) { - metadata_chunk_size = n_frames; - } - - hsize_t metadata_dataset_chunking[] = {metadata_chunk_size, 1}; - H5::DSetCreatPropList metadata_dataset_properties; - metadata_dataset_properties.setChunk(2, metadata_dataset_chunking); - - pulse_id_dataset_ = file_.createDataSet( - "pulse_id", - H5::PredType::NATIVE_UINT64, - metadata_dataspace, - metadata_dataset_properties); - - frame_index_dataset_ = file_.createDataSet( - "frame_index", - H5::PredType::NATIVE_UINT64, - metadata_dataspace, - metadata_dataset_properties); - - daq_rec_dataset_ = file_.createDataSet( - "daq_rec", - H5::PredType::NATIVE_UINT32, - metadata_dataspace, - metadata_dataset_properties); - - is_good_frame_dataset_ = file_.createDataSet( - "is_good_frame", - H5::PredType::NATIVE_UINT8, - metadata_dataspace, - metadata_dataset_properties); - -} - -WriterH5Writer::~WriterH5Writer() -{ - close_file(); -} - -void WriterH5Writer::close_file() -{ - image_dataset_.close(); - pulse_id_dataset_.close(); - frame_index_dataset_.close(); - daq_rec_dataset_.close(); - is_good_frame_dataset_.close(); - - file_.close(); -} - -void WriterH5Writer::write( - const ImageMetadataBuffer* metadata, const char* data) -{ - auto n_images_in_buffer = metadata->n_images; - - hsize_t b_i_dims[3] = { - n_images_in_buffer, - MODULE_Y_SIZE*n_modules_, - MODULE_X_SIZE}; - H5::DataSpace b_i_space(3, b_i_dims); - - hsize_t f_i_dims[3] = {n_frames_, - MODULE_Y_SIZE * n_modules_, - MODULE_X_SIZE}; - H5::DataSpace f_i_space(3, f_i_dims); - - hsize_t i_count[] = {n_images_in_buffer, - MODULE_Y_SIZE*n_modules_, - MODULE_X_SIZE}; - hsize_t i_start[] = {current_write_index_, 0, 0}; - f_i_space.selectHyperslab(H5S_SELECT_SET, i_count, i_start); - - image_dataset_.write( - data, H5::PredType::NATIVE_UINT16, - b_i_space, f_i_space); - - hsize_t b_m_dims[2] = {n_images_in_buffer, 1}; - H5::DataSpace b_m_space (2, b_m_dims); - - hsize_t f_m_dims[] = {n_frames_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - hsize_t meta_count[] = {n_images_in_buffer, 1}; - hsize_t meta_start[] = {current_write_index_, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start); - - pulse_id_dataset_.write( - &(metadata->pulse_id), H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - frame_index_dataset_.write( - &(metadata->frame_index), H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - daq_rec_dataset_.write( - &(metadata->daq_rec), H5::PredType::NATIVE_UINT32, - b_m_space, f_m_space); - - is_good_frame_dataset_.write( - &(metadata->is_good_image), H5::PredType::NATIVE_UINT8, - b_m_space, f_m_space); - - current_write_index_ += n_images_in_buffer; -} diff --git a/sf-writer/src/WriterZmqReceiver.cpp b/sf-writer/src/WriterZmqReceiver.cpp deleted file mode 100644 index 18d38dc..0000000 --- a/sf-writer/src/WriterZmqReceiver.cpp +++ /dev/null @@ -1,138 +0,0 @@ -#include "WriterZmqReceiver.hpp" -#include "zmq.h" -#include "date.h" -#include -#include - -using namespace std; -using namespace core_buffer; - -WriterZmqReceiver::WriterZmqReceiver( - void *ctx, - const string &ipc_prefix, - const size_t n_modules, - const uint64_t stop_pulse_id_) : - n_modules_(n_modules), - sockets_(n_modules), - stop_pulse_id_(stop_pulse_id_) -{ - - for (size_t i = 0; i < n_modules; i++) { - sockets_[i] = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = WRITER_RCVHWM; - if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix << i; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } -} - -WriterZmqReceiver::~WriterZmqReceiver() -{ - for (size_t i = 0; i < n_modules_; i++) { - zmq_close(sockets_[i]); - } -} - -void WriterZmqReceiver::get_next_buffer( - const uint64_t start_pulse_id, - ImageMetadataBuffer* i_meta, - char* image_buffer) -{ - auto n_images_in_buffer = WRITER_DATA_CACHE_N_IMAGES; - auto images_left = stop_pulse_id_ - start_pulse_id + 1; - if (images_left < n_images_in_buffer) { - n_images_in_buffer = images_left; - } - - i_meta->n_images = (uint16_t)n_images_in_buffer; - - for (uint64_t i_pulse=0; i_pulsepulse_id[i_pulse] = pulse_id; - i_meta->is_good_image[i_pulse] = 1; - i_meta->frame_index[i_pulse] = 0; - i_meta->daq_rec[i_pulse] = 0; - - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - - auto n_bytes_metadata = zmq_recv( - sockets_[i_module], &f_meta_, sizeof(f_meta_), 0); - - if (n_bytes_metadata != sizeof(f_meta_)) { - throw runtime_error("Wrong number of metadata bytes."); - } - - if (f_meta_.pulse_id == 0) { - i_meta->is_good_image[i_pulse] = 0; - - } else { - if (!pulse_id_initialized) { - // Init the image metadata with the first valid frame. - pulse_id_initialized = true; - - i_meta->frame_index[i_pulse] = f_meta_.frame_index; - i_meta->daq_rec[i_pulse] = f_meta_.daq_rec; - } - - if (f_meta_.pulse_id != i_meta->pulse_id[i_pulse]) { - stringstream err_msg; - - err_msg << "[WriterZmqReceiver::get_next_buffer]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << pulse_id; - err_msg << " received "; - err_msg << f_meta_.pulse_id; - err_msg << " from i_module " << i_module << endl; - - throw runtime_error(err_msg.str()); - } - } - - // Once the image is not good, we don't care to re-flag it. - if (i_meta->is_good_image[i_pulse] == 1) { - - if (f_meta_.frame_index != i_meta->frame_index[i_pulse]) { - i_meta->is_good_image[i_pulse] = 0; - } - - if (f_meta_.daq_rec != i_meta->daq_rec[i_pulse]) { - i_meta->is_good_image[i_pulse] = 0; - } - - if (f_meta_.n_received_packets != JF_N_PACKETS_PER_FRAME) { - i_meta->is_good_image[i_pulse] = 0; - } - } - - auto pulse_offset = i_pulse * n_modules_ * MODULE_N_BYTES ; - auto module_offset = i_module * MODULE_N_BYTES; - - auto n_bytes_image = zmq_recv( - sockets_[i_module], - (image_buffer + pulse_offset + module_offset), - MODULE_N_BYTES, 0); - - if (n_bytes_image != MODULE_N_BYTES) { - throw runtime_error("Wrong number of data bytes."); - } - } - } -} diff --git a/sf-writer/src/main.cpp b/sf-writer/src/main.cpp index 2a3774f..76f4f79 100644 --- a/sf-writer/src/main.cpp +++ b/sf-writer/src/main.cpp @@ -1,75 +1,72 @@ #include -#include -#include "buffer_config.hpp" -#include "zmq.h" #include -#include #include #include -#include "WriterH5Writer.hpp" -#include #include + #include "date.h" +#include "zmq.h" +#include "buffer_config.hpp" #include "bitshuffle/bitshuffle.h" -#include "WriterZmqReceiver.hpp" +#include "JFH5Writer.hpp" +#include "ImageAssembler.hpp" +#include "BufferBinaryReader.hpp" using namespace std; using namespace core_buffer; +using namespace chrono; -void receive_replay( - void* ctx, - const string ipc_prefix, - const size_t n_modules, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id) +void read_buffer( + const string device, + const string channel_name, + const int i_module, + const vector& buffer_blocks, + ImageAssembler& image_assembler) { - try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); + BufferBinaryReader block_reader(device, channel_name); + auto block_buffer = new BufferBinaryBlock(); - uint64_t current_pulse_id=start_pulse_id; + for (uint64_t block_id:buffer_blocks) { - // "<= stop_pulse_id" because we include the last pulse_id. - while(current_pulse_id<=stop_pulse_id) { - - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto metadata = queue.get_metadata_buffer(slot_id); - auto buffer = queue.get_data_buffer(slot_id); - - receiver.get_next_buffer( - current_pulse_id, metadata, buffer); - - queue.commit(); - current_pulse_id += metadata->n_images; + while(!image_assembler.is_slot_free(block_id)) { + this_thread::sleep_for(chrono::milliseconds( + WRITER_IMAGE_ASSEMBLER_RETRY_MS)); } - } catch (const std::exception& e) { - using namespace date; - using namespace chrono; + auto start_time = steady_clock::now(); - cout << "[" << system_clock::now() << "]"; - cout << "[sf_writer::receive_replay]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; + block_reader.get_block(block_id, block_buffer); - throw; + auto end_time = steady_clock::now(); + uint64_t read_us_duration = duration_cast( + end_time-start_time).count(); + + start_time = steady_clock::now(); + + image_assembler.process(block_id, i_module, block_buffer); + + end_time = steady_clock::now(); + uint64_t compose_us_duration = duration_cast( + end_time-start_time).count(); + + cout << "sf_replay:avg_read_us "; + cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; + cout << "sf_replay:avg_assemble_us "; + cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; } + + delete block_buffer; } int main (int argc, char *argv[]) { if (argc != 5) { cout << endl; - cout << "Usage: sf_writer "; - cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]"; + cout << "Usage: sf_writer [output_file] [device]"; + cout << " [start_pulse_id] [stop_pulse_id]"; cout << endl; - cout << "\tipc_id: Unique identifier for ipc." << endl; cout << "\toutput_file: Complete path to the output file." << endl; + cout << "\tdevice: Name of detector." << endl; cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; cout << endl; @@ -77,77 +74,75 @@ int main (int argc, char *argv[]) exit(-1); } - const string ipc_id = string(argv[1]); - string output_file = string(argv[2]); + string output_file = string(argv[1]); + const string device = string(argv[2]); uint64_t start_pulse_id = (uint64_t) atoll(argv[3]); uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]); - size_t n_modules = 32; - FastQueue queue( - MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, - WRITER_FASTQUEUE_N_SLOTS); + uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; + uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); + ImageAssembler image_assembler(n_modules); - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - thread replay_receive_thread(receive_replay, - ctx, ipc_base, n_modules, - ref(queue), start_pulse_id, stop_pulse_id); + // Generate list of buffer blocks that need to be loaded. + std::vector buffer_blocks; + for (uint64_t curr_block=start_block; + curr_block<=stop_block; + curr_block++) { + buffer_blocks.push_back(curr_block); + } - size_t n_frames = stop_pulse_id - start_pulse_id + 1; - WriterH5Writer writer(output_file, n_frames, n_modules); + std::vector reading_threads(n_modules); + for (size_t i_module=0; i_modulen_images; i++) { - if (metadata->pulse_id[i] != current_pulse_id) { - throw runtime_error("Wrong pulse id from receiver thread."); - } - - current_pulse_id++; - } - - auto end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - end_time-start_time).count(); - - start_time = chrono::steady_clock::now(); + auto start_time = steady_clock::now(); writer.write(metadata, data); - end_time = chrono::steady_clock::now(); + auto end_time = steady_clock::now(); auto write_us_duration = chrono::duration_cast( end_time-start_time).count(); - queue.release(); + image_assembler.free_slot(block_id); - auto avg_read_us = read_us_duration / metadata->n_images;; - auto avg_write_us = write_us_duration / metadata->n_images;; - - cout << "sf_writer:avg_read_us " << avg_read_us; - cout << " sf_writer:avg_write_us " << avg_write_us; - cout << endl; + cout << "sf_writer:avg_write_us "; + cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; } - writer.close_file(); + for (auto& reading_thread :reading_threads) { + if (reading_thread.joinable()) { + reading_thread.join(); + } + } - //wait till receive thread is finished - replay_receive_thread.join(); return 0; } diff --git a/sf-writer/test/CMakeLists.txt b/sf-writer/test/CMakeLists.txt index 7666c3e..2b9f4f0 100644 --- a/sf-writer/test/CMakeLists.txt +++ b/sf-writer/test/CMakeLists.txt @@ -3,18 +3,8 @@ add_executable(sf-writer-tests main.cpp) target_link_libraries(sf-writer-tests sf-writer-lib hdf5 + hdf5_hl hdf5_cpp zmq gtest ) - -add_executable(sf-writer-recv manual/test_sf_writer_recv.cpp) -set_target_properties(sf-writer-recv PROPERTIES OUTPUT_NAME sf_writer) -set_target_properties(sf-writer-recv PROPERTIES EXCLUDE_FROM_ALL TRUE) -target_link_libraries(sf-writer-recv - sf-writer-lib - zmq - hdf5 - hdf5_cpp - pthread - ) \ No newline at end of file diff --git a/sf-writer/test/main.cpp b/sf-writer/test/main.cpp index 7078be5..94aeb99 100644 --- a/sf-writer/test/main.cpp +++ b/sf-writer/test/main.cpp @@ -1,6 +1,6 @@ #include "gtest/gtest.h" -#include "test_WriterZmqReceiver.cpp" #include "test_WriterH5Writer.cpp" +#include "test_ImageAssembler.cpp" using namespace std; diff --git a/sf-writer/test/manual/test_sf_writer_recv.cpp b/sf-writer/test/manual/test_sf_writer_recv.cpp deleted file mode 100644 index 4f2e463..0000000 --- a/sf-writer/test/manual/test_sf_writer_recv.cpp +++ /dev/null @@ -1,130 +0,0 @@ -#include -#include "buffer_config.hpp" -#include "zmq.h" -#include -#include -#include -#include "WriterH5Writer.hpp" -#include -#include -#include "date.h" -#include "bitshuffle/bitshuffle.h" -#include "WriterZmqReceiver.hpp" - -using namespace std; -using namespace core_buffer; - -void receive_replay( - void* ctx, - const string ipc_prefix, - const size_t n_modules, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id) -{ - try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); - - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - uint64_t pulse_id = start_pulse_id; - // "<= stop_pulse_id" because we include the last pulse_id. - while(pulse_id <= stop_pulse_id) { - auto start_time = chrono::steady_clock::now(); - - auto image_metadata = queue.get_metadata_buffer(slot_id); - auto image_buffer = queue.get_data_buffer(slot_id); - - receiver.get_next_buffer(pulse_id, image_metadata, image_buffer); - - pulse_id += image_metadata->n_images; - - auto end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - end_time-start_time).count(); - - cout << "sf_writer::avg_read_us "; - cout << read_us_duration / image_metadata->n_images << endl; - } - - queue.commit(); - - } catch (const std::exception& e) { - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_writer::receive_replay]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; - - throw; - } -} - -int main (int argc, char *argv[]) -{ - if (argc != 5) { - cout << endl; - cout << "Usage: sf_writer "; - cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\tipc_id: Unique identifier for ipc." << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - const string ipc_id = string(argv[1]); - string output_file = string(argv[2]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[3]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]); - - size_t n_modules = 32; - - FastQueue queue( - MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, - WRITER_FASTQUEUE_N_SLOTS); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); - - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - - receive_replay( - ctx, ipc_base, n_modules,queue, start_pulse_id, stop_pulse_id); - -// -// -// auto current_pulse_id = start_pulse_id; -// // "<= stop_pulse_id" because we include the last pulse_id. -// while (current_pulse_id <= stop_pulse_id) { -// -// int slot_id; -// while((slot_id = queue.read()) == -1) { -// this_thread::sleep_for(chrono::milliseconds( -// RB_READ_RETRY_INTERVAL_MS)); -// } -// -// auto metadata = queue.get_metadata_buffer(slot_id); -// -// for (int i_pulse=0; i_pulse < metadata->n_images; i_pulse++) { -// cout << "Written image " << metadata->pulse_id[i_pulse] << endl; -// -// } -// -// queue.release(); -// current_pulse_id += metadata->n_images; -// } -// -// //wait till receive thread is finished -// replay_receive_thread.join(); - return 0; -} diff --git a/sf-writer/test/test_ImageAssembler.cpp b/sf-writer/test/test_ImageAssembler.cpp new file mode 100644 index 0000000..fb98fba --- /dev/null +++ b/sf-writer/test/test_ImageAssembler.cpp @@ -0,0 +1,90 @@ +#include + +#include "ImageAssembler.hpp" +#include "gtest/gtest.h" + +using namespace std; +using namespace core_buffer; + +TEST(ImageAssembler, basic_interaction) +{ + size_t n_modules = 3; + uint64_t bunch_id = 0; + + ImageAssembler assembler(n_modules); + + ASSERT_EQ(assembler.is_slot_free(bunch_id), true); + + auto buffer_block = make_unique(); + auto buffer_ptr = buffer_block.get(); + + for (size_t i_module=0; i_module < n_modules; i_module++) { + assembler.process(bunch_id, i_module, buffer_ptr); + } + + ASSERT_EQ(assembler.is_slot_full(bunch_id), true); + + auto metadata = assembler.get_metadata_buffer(bunch_id); + auto data = assembler.get_data_buffer(bunch_id); + + assembler.free_slot(bunch_id); + ASSERT_EQ(assembler.is_slot_free(bunch_id), true); + + for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { + ASSERT_EQ(metadata->is_good_image[i_pulse], 0); + } +} + +TEST(ImageAssembler, reconstruction) +{ + size_t n_modules = 2; + uint64_t bunch_id = 0; + + ImageAssembler assembler(n_modules); + + ASSERT_EQ(assembler.is_slot_free(bunch_id), true); + + auto buffer_block = make_unique(); + auto buffer_ptr = buffer_block.get(); + + for (size_t i_module=0; i_module < n_modules; i_module++) { + + for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { + auto& frame_meta = buffer_block->frame[i_pulse].metadata; + + frame_meta.pulse_id = 100 + i_pulse; + frame_meta.daq_rec = 1000 + i_pulse; + frame_meta.frame_index = 10000 + i_pulse; + frame_meta.n_received_packets = JF_N_PACKETS_PER_FRAME; + + for (size_t i_pixel=0; i_pixel < MODULE_N_PIXELS; i_pixel++) { + buffer_block->frame[i_pulse].data[i_pixel] = + (i_module * 10) + (i_pixel % 100); + } + } + + assembler.process(bunch_id, i_module, buffer_ptr); + } + + ASSERT_EQ(assembler.is_slot_full(bunch_id), true); + + auto metadata = assembler.get_metadata_buffer(bunch_id); + auto data = assembler.get_data_buffer(bunch_id); + + assembler.free_slot(bunch_id); + ASSERT_EQ(assembler.is_slot_free(bunch_id), true); + + ASSERT_EQ(metadata->block_start_pulse_id, 0); + ASSERT_EQ(metadata->block_stop_pulse_id, BUFFER_BLOCK_SIZE-1); + + for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { + ASSERT_EQ(metadata->pulse_id[i_pulse], 100 + i_pulse); + ASSERT_EQ(metadata->daq_rec[i_pulse], 1000 + i_pulse); + ASSERT_EQ(metadata->frame_index[i_pulse], 10000 + i_pulse); + ASSERT_EQ(metadata->is_good_image[i_pulse], 1); + + for (size_t i_module=0; i_module < n_modules; i_module++) { + // TODO: Check assembled image. + } + } +} diff --git a/sf-writer/test/test_WriterH5Writer.cpp b/sf-writer/test/test_WriterH5Writer.cpp index fd3f91c..c786747 100644 --- a/sf-writer/test/test_WriterH5Writer.cpp +++ b/sf-writer/test/test_WriterH5Writer.cpp @@ -1,92 +1,112 @@ +#include -#include "WriterH5Writer.hpp" +#include "JFH5Writer.hpp" #include "gtest/gtest.h" #include "bitshuffle/bitshuffle.h" - +using namespace std; using namespace core_buffer; TEST(WriterH5Writer, basic_interaction) { size_t n_modules = 2; - size_t n_frames = 5; + uint64_t start_pulse_id = 1; + uint64_t stop_pulse_id = 5; - auto data = make_unique(n_modules*MODULE_N_BYTES); - auto metadata = make_shared(); + auto data = make_unique(n_modules*MODULE_N_BYTES*BUFFER_BLOCK_SIZE); + auto metadata = make_shared(); // Needed by writer. - metadata->data_n_bytes[0] = 500; - metadata->n_pulses_in_buffer = 1; + metadata->block_start_pulse_id = 0; + metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1; - WriterH5Writer writer("ignore.h5", n_frames, n_modules); + JFH5Writer writer("ignore.h5", start_pulse_id, stop_pulse_id, n_modules); writer.write(metadata.get(), data.get()); - writer.close_file(); } -TEST(WriterH5Writer, test_compression) +TEST(WriterH5Writer, test_writing) { -// size_t n_modules = 2; -// size_t n_frames = 2; -// -// auto comp_buffer_size = bshuf_compress_lz4_bound( -// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); -// -// auto f_raw_buffer = make_unique(MODULE_N_PIXELS); -// auto f_comp_buffer = make_unique(comp_buffer_size); -// -// auto i_comp_buffer = make_unique( -// (comp_buffer_size * n_modules) + BSHUF_LZ4_HEADER_BYTES); -// auto i_raw_buffer = make_unique( -// MODULE_N_PIXELS * n_modules * n_frames); -// -// bshuf_write_uint64_BE(&i_comp_buffer[0], -// MODULE_N_BYTES * n_modules); -// bshuf_write_uint32_BE(&i_comp_buffer[8], -// MODULE_N_PIXELS * PIXEL_N_BYTES); -// -// size_t total_compressed_size = BSHUF_LZ4_HEADER_BYTES; -// for (int i_module=0; i_module(); -// metadata->data_n_bytes = total_compressed_size; -// -// metadata->is_good_frame = 1; -// metadata->frame_index = 3; -// metadata->pulse_id = 3; -// metadata->daq_rec = 3; -// -// auto result = bshuf_decompress_lz4( -// &i_comp_buffer[12], &i_raw_buffer[0], -// MODULE_N_PIXELS*n_modules, PIXEL_N_BYTES, MODULE_N_PIXELS); -// -// WriterH5Writer writer("ignore.h5", n_frames, n_modules); -// writer.write(metadata.get(), &i_comp_buffer[0]); -// writer.close_file(); -// -// H5::H5File reader("ignore.h5", H5F_ACC_RDONLY); -// auto image_dataset = reader.openDataSet("image"); -// image_dataset.read(&i_raw_buffer[0], H5::PredType::NATIVE_UINT16); -// -// for (int i_module=0; i_module(); + metadata->block_start_pulse_id = 0; + metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1; + + for (uint64_t pulse_id=start_pulse_id; + pulse_id<=stop_pulse_id; + pulse_id++) { + + metadata->pulse_id[pulse_id] = pulse_id; + metadata->frame_index[pulse_id] = pulse_id + 10; + metadata->daq_rec[pulse_id] = pulse_id + 100; + metadata->is_good_image[pulse_id] = 1; + } + + + auto image_buffer = make_unique( + MODULE_N_PIXELS * n_modules * BUFFER_BLOCK_SIZE); + + for (int i_block=0; i_block<=BUFFER_BLOCK_SIZE; i_block++) { + for (int i_module=0; i_module(n_images); + auto pulse_id_dataset = reader.openDataSet("pulse_id"); + pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64); + + auto frame_index_data = make_unique(n_images); + auto frame_index_dataset = reader.openDataSet("frame_index"); + frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64); + + auto daq_rec_data = make_unique(n_images); + auto daq_rec_dataset = reader.openDataSet("daq_rec"); + daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32); + + auto is_good_frame_data = make_unique(n_images); + auto is_good_frame_dataset = reader.openDataSet("is_good_frame"); + is_good_frame_dataset.read( + &is_good_frame_data[0], H5::PredType::NATIVE_UINT8); + + for (uint64_t pulse_id=start_pulse_id; + pulse_id<=stop_pulse_id; + pulse_id++) { + + ASSERT_EQ(pulse_id_data[pulse_id - start_pulse_id], pulse_id); + ASSERT_EQ(frame_index_data[pulse_id - start_pulse_id], pulse_id + 10); + ASSERT_EQ(daq_rec_data[pulse_id - start_pulse_id], pulse_id + 100); + ASSERT_EQ(is_good_frame_data[pulse_id - start_pulse_id], 1); + } } \ No newline at end of file diff --git a/sf-writer/test/test_WriterZmqReceiver.cpp b/sf-writer/test/test_WriterZmqReceiver.cpp deleted file mode 100644 index 402e409..0000000 --- a/sf-writer/test/test_WriterZmqReceiver.cpp +++ /dev/null @@ -1,80 +0,0 @@ -#include -#include "WriterZmqReceiver.hpp" -#include "bitshuffle/bitshuffle.h" -#include -#include -#include "buffer_config.hpp" -#include "zmq.h" - -using namespace std; -using namespace core_buffer; - -TEST(WriterZmqReceiver, basic_test) -{ - size_t n_modules = 4; - uint64_t pulse_id = 12345; - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1); - - void* sockets[n_modules]; - for (size_t i = 0; i < n_modules; i++) { - sockets[i] = zmq_socket(ctx, ZMQ_PUSH); - - int linger = 0; - if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << REPLAY_STREAM_IPC_URL << i; - const auto ipc = ipc_addr.str(); - - if (zmq_bind(sockets[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } - this_thread::sleep_for(chrono::milliseconds(100)); - - WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules); - this_thread::sleep_for(chrono::milliseconds(100)); - - size_t compressed_frame_size = 5000; - auto frame_buffer = make_unique(compressed_frame_size); - - ImageMetadata image_metadata; - auto compress_size = bshuf_compress_lz4_bound( - MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); - auto image_buffer = make_unique(compress_size * n_modules); - - for (size_t i = 0; i < n_modules; i++) { - - ReplayModuleFrameBuffer frame_metadata; - frame_metadata.metadata.pulse_id = pulse_id; - frame_metadata.metadata.frame_index = pulse_id + 100; - frame_metadata.metadata.n_received_packets = 128; - frame_metadata.metadata.daq_rec = 4; - - frame_metadata.is_frame_present = 1; - frame_metadata.data_n_bytes = compressed_frame_size; - - zmq_send(sockets[i], - &frame_metadata, - sizeof(ReplayModuleFrameBuffer), - ZMQ_SNDMORE); - - zmq_send(sockets[i], - (char*)(frame_buffer.get()), - compressed_frame_size, - 0); - } - - receiver.get_next_buffer(pulse_id, &image_metadata, image_buffer.get()); - EXPECT_EQ(pulse_id, image_metadata.pulse_id); - EXPECT_EQ(image_metadata.is_good_frame, 1); - EXPECT_EQ(image_metadata.daq_rec, 4); - EXPECT_EQ(image_metadata.data_n_bytes, - 5000*n_modules); -// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES); -} \ No newline at end of file