Merge pull request #3 from paulscherrerinstitute/binary_buffer

Binary buffer
This commit is contained in:
2020-06-03 08:00:08 +02:00
committed by GitHub
59 changed files with 1267 additions and 1373 deletions
-1
View File
@@ -30,6 +30,5 @@ add_subdirectory(
add_subdirectory("core-buffer")
add_subdirectory("sf-buffer")
add_subdirectory("sf-replay")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
+3 -1
View File
@@ -124,4 +124,6 @@ In order to unify the way we write code and talk about concept the following
terminology definitions should be followed:
- frame (data from a single module)
- image (data of the assembled image)
- image (data of the assembled image)
- start_pulse_id and stop_pulse_id (not end_pulse_id) is used to determine the
inclusive range (both start and stop pulse_id are included) of pulses.
+6 -6
View File
@@ -19,7 +19,7 @@ namespace core_buffer {
// Must be power of 10 and >= than FILE_MOD.
const size_t FOLDER_MOD = 100000;
// Extension of our file format.
const std::string FILE_EXTENSION = ".h5";
const std::string FILE_EXTENSION = ".bin";
// Number of pulses between each statistics print out.
const size_t STATS_MODULO = 100;
// If the RB is empty, how much time to wait before trying to read it again.
@@ -29,9 +29,9 @@ namespace core_buffer {
const size_t BUFFER_UDP_N_RECV_MSG = 64;
// Size of UDP recv buffer
const int BUFFER_UDP_RCVBUF_N_SLOTS = 100;
// +1 for packet headers.
// 8246 bytes for each UDP packet.
const int BUFFER_UDP_RCVBUF_BYTES =
(128 * 8246 * BUFFER_UDP_RCVBUF_N_SLOTS);
(128 * BUFFER_UDP_RCVBUF_N_SLOTS * 8246);
// Microseconds timeout for UDP recv.
const int BUFFER_UDP_US_TIMEOUT = 2 * 1000;
// HWM for live stream from buffer.
@@ -53,7 +53,7 @@ namespace core_buffer {
// Address where Replay streams and writer receives.
const std::string REPLAY_STREAM_IPC_URL = "ipc:///tmp/sf-replay-";
// How many frames to read at once from file.
const size_t REPLAY_READ_BUFFER_SIZE = 100;
const size_t BUFFER_BLOCK_SIZE = 100;
// How many slots to have in the internal queue between read and send.
const int REPLAY_FASTQUEUE_N_SLOTS = 2;
// How many frames do we buffer in send.
@@ -63,8 +63,8 @@ namespace core_buffer {
const int WRITER_RCVHWM = 100;
// ZMQ threads for receiving data from sf_replay.
const int WRITER_ZMQ_IO_THREADS = 2;
// Size of buffer between the receiving and writing part.
const int WRITER_FASTQUEUE_N_SLOTS = 5;
// MS to retry reading from the image assembler.
const size_t WRITER_IMAGE_ASSEMBLER_RETRY_MS = 5;
// How large are metadata chunks in the HDF5.
const size_t WRITER_METADATA_CHUNK_N_IMAGES = 100;
// How large should the data cache be in N images.
+23 -9
View File
@@ -4,22 +4,36 @@
#include "buffer_config.hpp"
#include "jungfrau.hpp"
struct ImageMetadataBuffer
struct ImageMetadataBlock
{
uint64_t pulse_id[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint64_t frame_index[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint32_t daq_rec[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint8_t is_good_image[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint16_t n_images;
uint64_t pulse_id[core_buffer::BUFFER_BLOCK_SIZE];
uint64_t frame_index[core_buffer::BUFFER_BLOCK_SIZE];
uint32_t daq_rec[core_buffer::BUFFER_BLOCK_SIZE];
uint8_t is_good_image[core_buffer::BUFFER_BLOCK_SIZE];
uint64_t block_start_pulse_id;
uint64_t block_stop_pulse_id;
};
const char BUFFER_FORMAT_START_BYTE = 0xBE;
#pragma pack(push)
#pragma pack(1)
struct ReplayBuffer
struct BufferBinaryFormat {
BufferBinaryFormat() : FORMAT_MARKER(BUFFER_FORMAT_START_BYTE) {};
const char FORMAT_MARKER;
ModuleFrame metadata;
char data[core_buffer::MODULE_N_BYTES];
};
#pragma pack(pop)
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryBlock
{
ModuleFrame metadata[core_buffer::REPLAY_READ_BUFFER_SIZE];
BufferBinaryFormat frame[core_buffer::BUFFER_BLOCK_SIZE];
uint64_t start_pulse_id;
uint16_t n_frames;
};
#pragma pack(pop)
+2 -2
View File
@@ -103,7 +103,7 @@ void FastQueue<T>::release()
read_slot_id_ %= n_slots_;
}
template class FastQueue<ImageMetadataBuffer>;
template class FastQueue<ReplayBuffer>;
template class FastQueue<ImageMetadataBlock>;
template class FastQueue<BufferBinaryBlock>;
template class FastQueue<ModuleFrame>;
template class FastQueue<ModuleFrameBuffer>;
+3 -1
View File
@@ -145,4 +145,6 @@ TEST(FaseQueue, array_parameter)
ASSERT_EQ(module_metadata.n_received_packets, i_module);
ASSERT_EQ(module_metadata.module_id, i_module);
}
}
}
// TODO: Test with payload of zero (metadata only).
+20
View File
@@ -0,0 +1,20 @@
#!/bin/bash
if [ $# != 1 ]
then
systemctl start JF01-buffer-worker@{00..02}
exit
fi
M=$1
# Add ourselves to the user cpuset.
# echo $$ > /sys/fs/cgroup/cpuset/user/tasks
coreAssociatedBuffer=(12 12 12)
initialUDPport=50010
port=$((${initialUDPport}+10#${M}))
DETECTOR=JF01T03V01
taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M}
+16
View File
@@ -0,0 +1,16 @@
[Unit]
Description=JF01 UDP2buffer worker instance as a service, instance %i
Requires=JF01-buffer.service
Before=JF01-buffer.service
BindsTo=JF01-buffer.service
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh %i
TimeoutStartSec=10
RestartSec=10
[Install]
WantedBy=JF01-buffer.service
@@ -1,9 +1,9 @@
[Unit]
Description=All replay instances of JF07
Description=All UDP-buffer instances of JF01
[Service]
Type=oneshot
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-replay-worker.sh
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh
RemainAfterExit=yes
[Install]
+15
View File
@@ -0,0 +1,15 @@
[Unit]
Description=stream service (to streamvis and live analysis) of JF01
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-stream.sh
TimeoutStartSec=10
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
+5
View File
@@ -0,0 +1,5 @@
#!/bin/bash
coreAssociated="24"
taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF01.json
+1 -5
View File
@@ -21,11 +21,7 @@ coreAssociatedBuffer=(4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9 10 10 10 1
#coreAssociatedBuffer=(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32)
initialUDPport=50100
# strange that it doesn't work to add 08 or 09 (value too great for base (error token is "09"))
port=$((${initialUDPport}+10#${M}))
#port=`expr ${initialUDPport} + ${M}`
DETECTOR=JF07T32V01
echo ${port}
#taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} > /gpfs/photonics/swissfel/buffer/${port}.log
taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M}
taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M}
-26
View File
@@ -1,26 +0,0 @@
#!/bin/bash
if [ $# != 1 ]
then
systemctl start JF07-replay-worker@{00..32}
exit
fi
M=$1
#8 replay workers per core, last (stream to visualisation) worker occupies 4
coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27)
latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST`
last_pulse_id=`basename ${latest_file} | sed 's/.h5//'`
first_pulse_id=$((${last_pulse_id}-360000))
echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}"
if [ ${M} == 32 ]
then
# taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id}
taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 10 tcp://192.168.30.29:9107 30
else
taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id}
fi
-16
View File
@@ -1,16 +0,0 @@
[Unit]
Description=JF07 replay worker instance as a service, instance %i
Requires=JF07-replay.service
Before=JF07-replay.service
BindsTo=JF07-replay.service
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-replay-worker.sh %i
TimeoutStartSec=10
RestartSec=10
[Install]
WantedBy=JF07-replay.service
+1 -1
View File
@@ -2,4 +2,4 @@
coreAssociated="20,21,22,23"
taskset -c ${coreAssociated} /usr/bin/sf_stream tcp://129.129.241.42:9007 25 tcp://192.168.30.29:9107 10
taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF07.json
+20
View File
@@ -0,0 +1,20 @@
#!/bin/bash
if [ $# != 1 ]
then
systemctl start JF13-buffer-worker@00
exit
fi
M=$1
# Add ourselves to the user cpuset.
# echo $$ > /sys/fs/cgroup/cpuset/user/tasks
coreAssociatedBuffer=(13)
initialUDPport=50190
port=$((${initialUDPport}+10#${M}))
DETECTOR=JF13T01V01
taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M}
+16
View File
@@ -0,0 +1,16 @@
[Unit]
Description=JF13 UDP2buffer worker instance as a service, instance %i
Requires=JF13-buffer.service
Before=JF13-buffer.service
BindsTo=JF13-buffer.service
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-buffer-worker.sh %i
TimeoutStartSec=10
RestartSec=10
[Install]
WantedBy=JF13-buffer.service
+10
View File
@@ -0,0 +1,10 @@
[Unit]
Description=All UDP-buffer instances of JF13
[Service]
Type=oneshot
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-buffer-worker.sh
RemainAfterExit=yes
[Install]
WantedBy=multi-user.target
+15
View File
@@ -0,0 +1,15 @@
[Unit]
Description=stream service (to streamvis and live analysis) of JF13
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF13-stream.sh
TimeoutStartSec=10
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
+5
View File
@@ -0,0 +1,5 @@
#!/bin/bash
coreAssociated="25"
taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF13.json
+30
View File
@@ -0,0 +1,30 @@
#!/bin/bash
hours=5
threshold=80
if [ $# = 1 ]
then
threshold=$1
fi
if [ $# = 2 ]
then
hours=$2
fi
df -h | grep BUFFER > /dev/null
if [ $? != 0 ]
then
# BUFFER is not present
exit
fi
occupancy=`df -h /gpfs/photonics/swissfel/buffer | grep BUFFER | awk '{print $5}' | sed 's/%//'`
if [ ${occupancy} -lt ${threshold} ]
then
# echo OK, not action
exit
fi
#find /gpfs/photonics/swissfel/buffer/JF* -type f -mmin +$((${hours}*60)) -delete
find /gpfs/photonics/swissfel/buffer/JF*/M* -type d -mmin +$((${hours}*60)) -delete
+46
View File
@@ -0,0 +1,46 @@
#!/bin/bash
if [ $# -lt 1 ]
then
echo "Usage : $0 DETECTOR_NAME <number_of_cycles>"
echo " DETECTOR_NAME: JF07 or JF01..."
echo " number_of_cycles : optional, default 100"
exit
fi
DETECTOR=$1
case ${DETECTOR} in
'JF01')
D=1
;;
'JF07')
D=7
;;
'JF13')
D=13
;;
*)
echo "Unsupported detector"
exit
;;
esac
n_cycles=100
if [ $# == 2 ]
then
n_cycles=$2
fi
export PATH=/home/dbe/miniconda3/bin:$PATH
source deactivate
source activate dia
sls_detector_put ${D}-timing trigger
sls_detector_put ${D}-cycles ${n_cycles}
sls_detector_put ${D}-exptime 5e-06
sls_detector_put ${D}-frames 1
sls_detector_put ${D}-dr 16
#sls_detector_put ${D}-clearbit to 0x5d 0 # normal mode, not highG0
sls_detector_put ${D}-status start
echo "Now start trigger"
+1 -1
View File
@@ -17,4 +17,4 @@ target_link_libraries(sf-buffer
hdf5_cpp)
enable_testing()
add_subdirectory(test/)
add_subdirectory(test/)
-23
View File
@@ -1,23 +0,0 @@
#ifndef JFFILEFORMAT_HPP
#define JFFILEFORMAT_HPP
#include "jungfrau.hpp"
const char JF_FORMAT_START_BYTE = 0xBE;
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryFormat {
BufferBinaryFormat() : FORMAT_MARKER(JF_FORMAT_START_BYTE) {};
const char FORMAT_MARKER;
uint64_t pulse_id;
uint64_t frame_id;
uint32_t daq_rec;
uint16_t n_recv_packets;
char data[JUNGFRAU_DATA_BYTES_PER_FRAME];
};
#pragma pack(pop)
#endif // JFFILEFORMAT_HPP
+5 -4
View File
@@ -2,12 +2,13 @@
#define BINARYWRITER_HPP
#include <string>
#include "BufferBinaryFormat.hpp"
#include "formats.hpp"
class BufferBinaryWriter {
const std::string device_name_;
const std::string root_folder_;
const std::string device_name_;
std::string latest_filename_;
std::string current_output_filename_;
@@ -19,8 +20,8 @@ class BufferBinaryWriter {
public:
BufferBinaryWriter(
const std::string& device_name,
const std::string& root_folder);
const std::string& root_folder,
const std::string& device_name);
virtual ~BufferBinaryWriter();
+3 -2
View File
@@ -6,9 +6,10 @@
#include <string>
#include <H5Cpp.h>
#include <memory>
#include "jungfrau.hpp"
#include <unordered_map>
#include <buffer_config.hpp>
#include "jungfrau.hpp"
#include "buffer_config.hpp"
class BufferH5Writer {
+14 -9
View File
@@ -1,21 +1,23 @@
#include "BufferBinaryWriter.hpp"
#include <unistd.h>
#include <iostream>
#include "date.h"
#include <cerrno>
#include <chrono>
#include <cstring>
#include <BufferUtils.hpp>
#include <fcntl.h>
#include <WriterUtils.hpp>
#include "BufferUtils.hpp"
#include "WriterUtils.hpp"
using namespace std;
BufferBinaryWriter::BufferBinaryWriter(
const string& device_name,
const string& root_folder) :
device_name_(device_name),
const string& root_folder,
const string& device_name):
root_folder_(root_folder),
device_name_(device_name),
latest_filename_(root_folder + "/" + device_name + "/LATEST"),
current_output_filename_(""),
output_file_fd_(-1)
@@ -27,7 +29,9 @@ BufferBinaryWriter::~BufferBinaryWriter()
close_current_file();
}
void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buffer)
void BufferBinaryWriter::write(
uint64_t pulse_id,
const BufferBinaryFormat* buffer)
{
auto current_frame_file =
BufferUtils::get_filename(root_folder_, device_name_, pulse_id);
@@ -37,7 +41,8 @@ void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buff
}
size_t n_bytes_offset =
BufferUtils::get_file_frame_index(pulse_id) * sizeof(BufferBinaryFormat);
BufferUtils::get_file_frame_index(pulse_id) *
sizeof(BufferBinaryFormat);
auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
@@ -118,7 +123,7 @@ void BufferBinaryWriter::close_current_file()
BufferUtils::update_latest_file(
latest_filename_, current_output_filename_);
}
current_output_filename_ = "";
current_output_filename_ = "";
}
}
+2 -1
View File
@@ -84,7 +84,8 @@ void BufferH5Writer::set_pulse_id(const uint64_t pulse_id)
if (h5_file_.getId() != -1) {
auto latest_filename = output_filename_;
close_file();
BufferUtils::update_latest_file(LATEST_filename_, latest_filename);
BufferUtils::update_latest_file(
LATEST_filename_, latest_filename);
}
WriterUtils::create_destination_folder(new_output_filename);
+3
View File
@@ -1,5 +1,6 @@
#include <iostream>
#include <unistd.h>
#include <filesystem>
#include "WriterUtils.hpp"
#include "date.h"
@@ -41,8 +42,10 @@ void WriterUtils::create_destination_folder(const string& output_file)
auto file_separator_index = output_file.rfind('/');
if (file_separator_index != string::npos) {
string output_folder(output_file.substr(0, file_separator_index));
// TODO: filesystem::create_directories(output_folder)
string create_folder_command("mkdir -p " + output_folder);
system(create_folder_command.c_str());
}
+24 -20
View File
@@ -1,6 +1,6 @@
#include <iostream>
#include <stdexcept>
#include <BufferH5Writer.hpp>
#include <BufferBinaryWriter.hpp>
#include "zmq.h"
#include "buffer_config.hpp"
#include "jungfrau.hpp"
@@ -11,16 +11,18 @@
#include <sys/resource.h>
#include <syscall.h>
#include <zconf.h>
#include "formats.hpp"
using namespace std;
using namespace core_buffer;
int main (int argc, char *argv[]) {
if (argc != 5) {
if (argc != 6) {
cout << endl;
cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]";
cout << "Usage: sf_buffer [detector_name] [device_name] [udp_port] [root_folder]";
cout << "[source_id]";
cout << endl;
cout << "\tdetector_name: Detector name, example JF07T32V01" << endl;
cout << "\tdevice_name: Name to write to disk.";
cout << "\tudp_port: UDP port to connect to." << endl;
cout << "\troot_folder: FS root folder." << endl;
@@ -30,13 +32,15 @@ int main (int argc, char *argv[]) {
exit(-1);
}
string device_name = string(argv[1]);
int udp_port = atoi(argv[2]);
string root_folder = string(argv[3]);
int source_id = atoi(argv[4]);
string detector_name = string(argv[1]);
string device_name = string(argv[2]);
int udp_port = atoi(argv[3]);
string root_folder = string(argv[4]);
int source_id = atoi(argv[5]);
stringstream ipc_stream;
ipc_stream << BUFFER_LIVE_IPC_URL << source_id;
string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL + detector_name + "-";
ipc_stream << LIVE_IPC_URL << source_id;
const auto ipc_address = ipc_stream.str();
auto ctx = zmq_ctx_new();
@@ -59,7 +63,7 @@ int main (int argc, char *argv[]) {
uint64_t n_corrupted_frames = 0;
uint64_t last_pulse_id = 0;
BufferH5Writer writer(root_folder, device_name);
BufferBinaryWriter writer(root_folder, device_name);
BufferUdpReceiver receiver(udp_port, source_id);
pid_t tid;
@@ -67,8 +71,7 @@ int main (int argc, char *argv[]) {
int ret = setpriority(PRIO_PROCESS, tid, 0);
if (ret == -1) throw runtime_error("cannot set nice");
ModuleFrame metadata;
auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES];
BufferBinaryFormat* binary_buffer = new BufferBinaryFormat();
size_t write_total_us = 0;
size_t write_max_us = 0;
@@ -77,12 +80,12 @@ int main (int argc, char *argv[]) {
while (true) {
auto pulse_id = receiver.get_frame_from_udp(metadata, frame_buffer);
auto pulse_id = receiver.get_frame_from_udp(
binary_buffer->metadata, binary_buffer->data);
auto start_time = chrono::steady_clock::now();
writer.set_pulse_id(pulse_id);
writer.write(&metadata, frame_buffer);
writer.write(pulse_id, binary_buffer);
auto write_end_time = chrono::steady_clock::now();
auto write_us_duration = chrono::duration_cast<chrono::microseconds>(
@@ -90,8 +93,9 @@ int main (int argc, char *argv[]) {
start_time = chrono::steady_clock::now();
zmq_send(socket, &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE);
zmq_send(socket, frame_buffer, MODULE_N_BYTES, 0);
zmq_send(socket, &(binary_buffer->metadata), sizeof(ModuleFrame),
ZMQ_SNDMORE);
zmq_send(socket, binary_buffer->data, MODULE_N_BYTES, 0);
auto send_end_time = chrono::steady_clock::now();
auto send_us_duration = chrono::duration_cast<chrono::microseconds>(
@@ -110,9 +114,9 @@ int main (int argc, char *argv[]) {
send_max_us = send_us_duration;
}
if (metadata.n_received_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets +=
JF_N_PACKETS_PER_FRAME - metadata.n_received_packets;
if (binary_buffer->metadata.n_received_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets += JF_N_PACKETS_PER_FRAME -
binary_buffer->metadata.n_received_packets;
n_corrupted_frames++;
}
@@ -146,5 +150,5 @@ int main (int argc, char *argv[]) {
}
}
delete[] frame_buffer;
delete binary_buffer;
}
-21
View File
@@ -1,21 +0,0 @@
file(GLOB SOURCES
src/*.cpp)
add_library(sf-replay-lib STATIC ${SOURCES})
target_include_directories(sf-replay-lib PUBLIC include/)
target_link_libraries(sf-replay-lib
external
core-buffer-lib)
add_executable(sf-replay src/main.cpp)
set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay)
target_link_libraries(sf-replay
core-buffer-lib
sf-replay-lib
zmq
hdf5
hdf5_cpp
pthread)
enable_testing()
add_subdirectory(test/)
-34
View File
@@ -1,34 +0,0 @@
#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP
#define SF_DAQ_BUFFER_REPLAYH5READER_HPP
#include <string>
#include <H5Cpp.h>
#include <memory>
#include "formats.hpp"
class ReplayH5Reader {
const std::string device_;
const std::string channel_name_;
H5::H5File current_file_;
std::string current_filename_;
H5::DataSet dset_metadata_;
H5::DataSet dset_frame_;
public:
ReplayH5Reader(
const std::string device,
const std::string channel_name);
virtual ~ReplayH5Reader();
void close_file();
void get_buffer(
const uint64_t pulse_id,
ReplayBuffer* metadata,
char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP
-23
View File
@@ -1,23 +0,0 @@
#ifndef SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
#define SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
#include <string>
#include <jungfrau.hpp>
#include <formats.hpp>
class ReplayZmqSender {
void* ctx_;
void* socket_;
public:
ReplayZmqSender(const std::string& ipc_id, const int source_id);
virtual ~ReplayZmqSender();
void close();
void send(const ReplayBuffer* metadata, const char* data);
};
#endif //SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
-89
View File
@@ -1,89 +0,0 @@
#include "ReplayH5Reader.hpp"
#include "BufferUtils.hpp"
using namespace std;
using namespace core_buffer;
ReplayH5Reader::ReplayH5Reader(
const string device,
const string channel_name) :
device_(device),
channel_name_(channel_name)
{
}
ReplayH5Reader::~ReplayH5Reader()
{
close_file();
}
void ReplayH5Reader::close_file()
{
if (current_file_.getId() != -1) {
dset_metadata_.close();
dset_frame_.close();
current_file_.close();
}
}
void ReplayH5Reader::get_buffer(
const uint64_t pulse_id,
ReplayBuffer* metadata,
char* data)
{
auto pulse_filename = BufferUtils::get_filename(
device_, channel_name_, pulse_id);
if (pulse_filename != current_filename_) {
close_file();
current_filename_ = pulse_filename;
current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY);
dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET);
dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET);
}
auto file_index = BufferUtils::get_file_frame_index(pulse_id);
auto cache_start_index = file_index / REPLAY_READ_BUFFER_SIZE;
cache_start_index *= REPLAY_READ_BUFFER_SIZE;
uint64_t b_start_pulse_id = pulse_id - (file_index - cache_start_index);
metadata->start_pulse_id = b_start_pulse_id;
metadata->n_frames = REPLAY_READ_BUFFER_SIZE;
hsize_t b_m_dims[2] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t b_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
hsize_t b_m_start[] = {cache_start_index, 0};
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start);
hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace f_m_space (2, f_m_dims);
hsize_t f_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
hsize_t pulse_id_start[] = {cache_start_index, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start);
dset_metadata_.read(
&metadata->metadata[0],
H5::PredType::NATIVE_UINT64, b_m_space, f_m_space);
hsize_t b_f_dims[3] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace b_f_space (3, b_f_dims);
hsize_t b_f_count[] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
hsize_t b_f_start[] = {0, 0, 0};
b_f_space.selectHyperslab(H5S_SELECT_SET, b_f_count, b_f_start);
hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace f_f_space (3, f_frame_dims);
hsize_t f_f_count[] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
hsize_t f_f_start[] = {cache_start_index, 0, 0};
f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start);
dset_frame_.read(
data, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space);
}
-48
View File
@@ -1,48 +0,0 @@
#include "ReplayZmqSender.hpp"
#include <sstream>
#include <zmq.h>
#include "buffer_config.hpp"
using namespace std;
using namespace core_buffer;
ReplayZmqSender::ReplayZmqSender(const string& ipc_id, const int source_id)
{
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
stringstream ipc_stream;
ipc_stream << ipc_base << source_id;
const auto ipc_address = ipc_stream.str();
ctx_ = zmq_ctx_new();
socket_ = zmq_socket(ctx_, ZMQ_PUSH);
const int sndhwm = REPLAY_SNDHWM;
if (zmq_setsockopt(socket_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(zmq_strerror (errno));
const int linger_ms = -1;
if (zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(zmq_strerror (errno));
if (zmq_bind(socket_, ipc_address.c_str()) != 0)
throw runtime_error(zmq_strerror (errno));
}
ReplayZmqSender::~ReplayZmqSender()
{
close();
}
void ReplayZmqSender::close() {
zmq_close(socket_);
zmq_ctx_destroy(ctx_);
}
void ReplayZmqSender::send(const ReplayBuffer* metadata, const char* data)
{
zmq_send(socket_, metadata, sizeof(ReplayBuffer), ZMQ_SNDMORE);
zmq_send(socket_, data, MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, 0);
}
-135
View File
@@ -1,135 +0,0 @@
#include <iostream>
#include <thread>
#include <FastQueue.hpp>
#include <cstring>
#include "buffer_config.hpp"
#include "ReplayH5Reader.hpp"
#include "ReplayZmqSender.hpp"
using namespace std;
using namespace core_buffer;
using namespace chrono;
void sf_replay (
const string device,
const string channel_name,
FastQueue<ReplayBuffer>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id
)
{
ReplayH5Reader file_reader(device, channel_name);
// "<= stop_pulse_id" because we include the stop_pulse_id in the file.
for (uint64_t curr_pulse_id=start_pulse_id;
curr_pulse_id <= stop_pulse_id;
curr_pulse_id++) {
int slot_id;
while((slot_id = queue.reserve()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
auto start_time = steady_clock::now();
auto metadata = queue.get_metadata_buffer(slot_id);
auto buffer = queue.get_data_buffer(slot_id);
file_reader.get_buffer(curr_pulse_id, metadata, buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration =
duration_cast<microseconds>(end_time-start_time).count();
queue.commit();
// TODO: Proper statistics
cout << "sf_replay:avg_read_us ";
cout << read_us_duration / REPLAY_READ_BUFFER_SIZE << endl;
}
}
int main (int argc, char *argv[]) {
if (argc != 7) {
cout << endl;
cout << "Usage: sf_replay [ipc_id] [device]";
cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\tdevice: Name of detector." << endl;
cout << "\tchannel_name: M00-M31 for JF16M." << endl;
cout << "\tsource_id: Module index" << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
const string ipc_id = string(argv[1]);
const string device = string(argv[2]);
const string channel_name = string(argv[3]);
const auto source_id = atoi(argv[4]);
const auto start_pulse_id = (uint64_t) atoll(argv[5]);
const auto stop_pulse_id = (uint64_t) atoll(argv[6]);
FastQueue<ReplayBuffer> queue(
MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE,
REPLAY_FASTQUEUE_N_SLOTS);
thread file_read_thread(sf_replay,
device, channel_name, ref(queue),
start_pulse_id, stop_pulse_id);
uint64_t send_us = 0;
uint64_t max_send_us = 0;
uint64_t n_stats = 0;
ReplayZmqSender sender(ipc_id, source_id);
// "<= stop_pulse_id" because we include the stop_pulse_id in the file.
for (uint64_t curr_pulse_id=start_pulse_id;
curr_pulse_id <= stop_pulse_id;
curr_pulse_id++) {
int slot_id;
while((slot_id = queue.read()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
auto m_buffer = queue.get_metadata_buffer(slot_id);
auto f_buffer = queue.get_data_buffer(slot_id);
auto start_time = steady_clock::now();
sender.send(m_buffer, f_buffer);
auto end_time = steady_clock::now();
uint64_t send_us_duration =
duration_cast<microseconds>(end_time-start_time).count();
queue.release();
// TODO: Proper statistics
n_stats++;
send_us += send_us_duration;
max_send_us = max(max_send_us, send_us_duration);
if (n_stats == STATS_MODULO) {
cout << "sf_replay:avg_send_us " << send_us / STATS_MODULO;
cout << " sf_replay:max_send_us " << max_send_us;
cout << endl;
n_stats = 0;
send_us = 0;
max_send_us = 0;
}
}
file_read_thread.join();
return 0;
}
-10
View File
@@ -1,10 +0,0 @@
add_executable(sf-replay-tests main.cpp)
target_link_libraries(sf-replay-tests
core-buffer-lib
sf-buffer-lib
sf-replay-lib
hdf5
hdf5_cpp
gtest
)
-9
View File
@@ -1,9 +0,0 @@
#include "gtest/gtest.h"
#include "test_ReplayH5Reader.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
-73
View File
@@ -1,73 +0,0 @@
#include <gtest/gtest.h>
#include <thread>
#include "ReplayH5Reader.hpp"
#include "BufferH5Writer.hpp"
using namespace std;
using namespace core_buffer;
TEST(ReplayH5Reader, basic_interaction)
{
auto root_folder = ".";
auto device_name = "fast_device";
size_t pulse_id = 65;
uint16_t source_id = 124;
// This 2 must be compatible by design.
BufferH5Writer writer(root_folder, device_name);
ReplayH5Reader reader(root_folder, device_name);
ModuleFrame w_metadata;
ModuleFrame* r_metadata;
auto w_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
char* r_frame_buffer;
// Setup test values.
w_metadata.pulse_id = pulse_id;
w_metadata.frame_index = 2;
w_metadata.daq_rec = 3;
w_metadata.n_received_packets = 128;
w_metadata.module_id = source_id;
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
w_frame_buffer[i] = i % 100;
}
// Write to file.
writer.set_pulse_id(pulse_id);
writer.write(&w_metadata, (char*)&(w_frame_buffer[0]));
writer.close_file();
reader.get_buffer(pulse_id, r_metadata, r_frame_buffer);
ASSERT_EQ(r_metadata->pulse_id, pulse_id);
ASSERT_EQ(r_metadata->module_id, source_id);
ASSERT_EQ(r_metadata->frame_index, 2);
ASSERT_EQ(r_metadata->daq_rec, 3);
ASSERT_EQ(r_metadata->n_received_packets, 128);
// Data as well.
auto offset = MODULE_N_PIXELS * (pulse_id-1);
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
w_frame_buffer[i] = r_frame_buffer[offset + i];
}
for (uint64_t i_pulse=0; i_pulse<100; i_pulse++) {
// Verify that all but the saved pulse_id are zero.
if (i_pulse == pulse_id) {
continue;
}
reader.get_buffer(i_pulse, r_metadata, r_frame_buffer);
ASSERT_EQ(r_metadata->pulse_id, 0);
ASSERT_EQ(r_metadata->frame_index, 0);
ASSERT_EQ(r_metadata->daq_rec, 0);
ASSERT_EQ(r_metadata->n_received_packets, 0);
ASSERT_EQ(r_metadata->module_id, 0);
}
reader.close_file();
}
+4 -2
View File
@@ -10,9 +10,11 @@ target_link_libraries(sf-stream-lib
add_executable(sf-stream src/main.cpp)
set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream)
target_link_libraries(sf-stream
external
sf-stream-lib
zmq
pthread)
pthread
jsoncpp)
enable_testing()
add_subdirectory(test/)
add_subdirectory(test/)
+89 -41
View File
@@ -10,35 +10,45 @@
#include <cstring>
#include <zmq.h>
#include <LiveRecvModule.hpp>
#include "date.h"
#include <jsoncpp/json/json.h>
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include <rapidjson/istreamwrapper.h>
#include <fstream>
using namespace std;
using namespace core_buffer;
int main (int argc, char *argv[])
{
if (argc != 5) {
if (argc != 2) {
cout << endl;
cout << "Usage: sf_stream ";
cout << " [streamvis_address] [reduction_factor_streamvis]";
cout << " [live_analysis_address] [reduction_factor_live_analysis]";
cout << " [config_json_file]";
cout << endl;
cout << "\tstreamvis_address: address to streamvis, example tcp://129.129.241.42:9007" << endl;
cout << "\treduction_factor_streamvis: 1 out of N (example 10) images to send to streamvis. For remaining send metadata." << endl;
cout << "\tlive_analysis_address: address to live_analysis, example tcp://129.129.241.42:9107" << endl;
cout << "\treduction_factor_live_analysis: 1 out of N (example 10) images to send to live analysis. For remaining send metadata. N<=1 - send every image" << endl;
cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl;
cout << endl;
exit(-1);
}
string streamvis_address = string(argv[1]);
int reduction_factor_streamvis = (int) atoll(argv[2]);
string live_analysis_address = string(argv[3]);
int reduction_factor_live_analysis = (uint64_t) atoll(argv[4]);
string config_json_file = string(argv[1]);
ifstream ifs(config_json_file);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
string streamvis_address = config_parameters["streamvis_stream"].GetString();
int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt();
string live_analysis_address = config_parameters["live_stream"].GetString();
int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt();
const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString();
const string GAIN_FILENAME = config_parameters["gain_file"].GetString();
const string DETECTOR_NAME = config_parameters["detector_name"].GetString();
size_t n_modules = config_parameters["n_modules"].GetInt();
size_t n_modules = 32;
FastQueue<ModuleFrameBuffer> queue(
n_modules * MODULE_N_BYTES,
STREAM_FASTQUEUE_SLOTS);
@@ -46,7 +56,9 @@ int main (int argc, char *argv[])
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL);
const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME+"-";
LiveRecvModule recv_module(queue, n_modules, ctx, LIVE_IPC_URL);
// 0mq sockets to streamvis and live analysis
void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
@@ -59,8 +71,8 @@ int main (int argc, char *argv[])
}
uint16_t data_empty [] = { 0, 0, 0, 0};
Json::Value header;
Json::StreamWriterBuilder builder;
// TODO: Remove stats trash.
int stats_counter = 0;
@@ -70,6 +82,10 @@ int main (int argc, char *argv[])
while (true) {
rapidjson::Document header(rapidjson::kObjectType);
auto& header_alloc = header.GetAllocator();
string text_header;
auto start_time = chrono::steady_clock::now();
auto slot_id = queue.read();
@@ -112,39 +128,63 @@ int main (int argc, char *argv[])
}
}
//Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
header["frame"] = (Json::Value::UInt64)frame_index;
header["is_good_frame"] = is_good_frame;
header["daq_rec"] = (Json::Value::UInt64)daq_rec;
header["pulse_id"] = (Json::Value::UInt64)pulse_id;
header.AddMember("frame", frame_index, header_alloc);
header.AddMember("is_good_frame", is_good_frame, header_alloc);
header.AddMember("daq_rec", daq_rec, header_alloc);
header.AddMember("pulse_id", pulse_id, header_alloc);
//this needs to be re-read from external source
header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
rapidjson::Value pedestal_file;
pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc);
header.AddMember("pedestal_file", pedestal_file, header_alloc);
header["number_frames_expected"] = 10000;
header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000);
rapidjson::Value gain_file;
gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc);
header.AddMember("gain_file", gain_file, header_alloc);
// detector name should come as parameter to sf_stream
header["detector_name"] = "JF07T32V01";
header.AddMember("number_frames_expected", 10000, header_alloc);
header["htype"] = "array-1.0";
header["type"] = "uint16";
rapidjson::Value run_name;
run_name.SetString(
to_string(uint64_t(pulse_id/10000)*10000).c_str(),
header_alloc);
header.AddMember("run_name", run_name, header_alloc);
rapidjson::Value detector_name;
detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
header.AddMember("htype", "array-1.0", header_alloc);
header.AddMember("type", "uint16", header_alloc);
// To be retrieved and filled with correct values down.
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
shape_value.PushBack((uint64_t)0, header_alloc);
shape_value.PushBack((uint64_t)0, header_alloc);
header.AddMember("shape", shape_value, header_alloc);
int send_streamvis = 0;
if ( reduction_factor_streamvis > 1 ) {
send_streamvis = rand() % reduction_factor_streamvis;
}
if ( send_streamvis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
auto& shape = header["shape"];
shape[0] = n_modules*512;
shape[1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
auto& shape = header["shape"];
shape[0] = 2;
shape[1] = 2;
}
string text_header = Json::writeString(builder, header);
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
header.Accept(writer);
text_header = buffer.GetString();
}
zmq_send(socket_streamvis,
text_header.c_str(),
@@ -169,14 +209,22 @@ int main (int argc, char *argv[])
send_live_analysis = rand() % reduction_factor_live_analysis;
}
if ( send_live_analysis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
auto& shape = header["shape"];
shape[0] = n_modules*512;
shape[1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
auto& shape = header["shape"];
shape[0] = 2;
shape[1] = 2;
}
text_header = Json::writeString(builder, header);
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
header.Accept(writer);
text_header = buffer.GetString();
}
zmq_send(socket_live,
text_header.c_str(),
+1
View File
@@ -13,6 +13,7 @@ target_link_libraries(sf-writer
sf-writer-lib
zmq
hdf5
hdf5_hl
hdf5_cpp
pthread
)
+28
View File
@@ -0,0 +1,28 @@
#ifndef SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
#define SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
#include <formats.hpp>
class BufferBinaryReader {
const std::string device_;
const std::string channel_name_;
std::string current_input_file_;
int input_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BufferBinaryReader(const std::string &device,
const std::string &channel_name);
virtual ~BufferBinaryReader();
void get_block(const uint64_t block_id, BufferBinaryBlock *buffer);
};
#endif //SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
+41
View File
@@ -0,0 +1,41 @@
#ifndef SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
#define SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
#include <atomic>
#include "formats.hpp"
const size_t IA_N_SLOTS = 2;
class ImageAssembler {
const size_t n_modules_;
const size_t image_buffer_slot_n_bytes_;
char* image_buffer_;
ImageMetadataBlock* meta_buffer_;
ModuleFrame* frame_meta_buffer_;
std::atomic_int* buffer_status_;
size_t get_data_offset(const uint64_t slot_id, const int i_module);
size_t get_metadata_offset(const uint64_t slot_id, const int i_module);
public:
ImageAssembler(const size_t n_modules);
virtual ~ImageAssembler();
bool is_slot_free(const uint64_t bunch_id);
bool is_slot_full(const uint64_t bunch_id);
void process(uint64_t bunch_id,
const int i_module,
const BufferBinaryBlock* block_buffer);
void free_slot(const uint64_t bunch_id);
ImageMetadataBlock* get_metadata_buffer(const uint64_t bunch_id);
char* get_data_buffer(const uint64_t bunch_id);
};
#endif //SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
+40
View File
@@ -0,0 +1,40 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "buffer_config.hpp"
#include "formats.hpp"
class JFH5Writer {
const uint64_t start_pulse_id_;
const uint64_t stop_pulse_id_;
const size_t n_modules_;
const size_t n_images_;
size_t current_write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
void close_file();
public:
JFH5Writer(const std::string& output_file,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const size_t n_modules);
~JFH5Writer();
void write(const ImageMetadataBlock* metadata, const char* data);
uint64_t* b_pulse_id_;
uint64_t* b_frame_index_;
uint32_t* b_daq_rec_;
uint8_t* b_is_good_frame_ ;
};
#endif //SFWRITER_HPP
-35
View File
@@ -1,35 +0,0 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "buffer_config.hpp"
#include "formats.hpp"
class WriterH5Writer {
const size_t n_frames_;
const size_t n_modules_;
size_t current_write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
H5::DataSet pulse_id_dataset_;
H5::DataSet frame_index_dataset_;
H5::DataSet daq_rec_dataset_;
H5::DataSet is_good_frame_dataset_;
public:
WriterH5Writer(const std::string& output_file,
const size_t n_frames,
const size_t n_modules);
~WriterH5Writer();
void write(const ImageMetadataBuffer* metadata, const char* data);
void close_file();
};
#endif //SFWRITER_HPP
-34
View File
@@ -1,34 +0,0 @@
#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#include <string>
#include "WriterH5Writer.hpp"
#include <vector>
#include <jungfrau.hpp>
class WriterZmqReceiver {
const size_t n_modules_;
std::vector<void*> sockets_;
const uint64_t stop_pulse_id_;
ModuleFrame f_meta_;
public:
WriterZmqReceiver(
void *ctx,
const std::string& ipc_prefix,
const size_t n_modules,
const uint64_t stop_pulse_id);
virtual ~WriterZmqReceiver();
void get_next_buffer(
const uint64_t start_pulse_id,
ImageMetadataBuffer* i_meta,
char* image_buffer);
};
#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
+123
View File
@@ -0,0 +1,123 @@
#include "BufferBinaryReader.hpp"
#include <unistd.h>
#include <sstream>
#include <date.h>
#include <cstring>
#include <fcntl.h>
#include "BufferUtils.hpp"
using namespace std;
using namespace core_buffer;
BufferBinaryReader::BufferBinaryReader(
const std::string &device,
const std::string &channel_name) :
device_(device),
channel_name_(channel_name),
current_input_file_(""),
input_file_fd_(-1)
{
}
BufferBinaryReader::~BufferBinaryReader()
{
close_current_file();
}
void BufferBinaryReader::get_block(
const uint64_t block_id, BufferBinaryBlock* buffer)
{
uint64_t block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE;
auto current_block_file = BufferUtils::get_filename(
device_, channel_name_, block_start_pulse_id);
if (current_block_file != current_input_file_) {
open_file(current_block_file);
}
size_t file_start_index =
BufferUtils::get_file_frame_index(block_start_pulse_id);
size_t n_bytes_offset = file_start_index * sizeof(BufferBinaryFormat);
auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryReader::get_block]";
err_msg << " Error while lseek on file ";
err_msg << current_input_file_;
err_msg << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::read(input_file_fd_, buffer,
sizeof(BufferBinaryFormat) * BUFFER_BLOCK_SIZE);
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryReader::get_block]";
err_msg << " Error while reading from file ";
err_msg << current_input_file_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void BufferBinaryReader::open_file(const std::string& filename)
{
close_current_file();
input_file_fd_ = open(filename.c_str(), O_RDONLY);
if (input_file_fd_ < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryReader::open_file]";
err_msg << " Cannot open file ";
err_msg << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
current_input_file_ = filename;
}
void BufferBinaryReader::close_current_file()
{
if (input_file_fd_ != -1) {
if (close(input_file_fd_) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BinaryWriter::close_current_file]";
err_msg << " Error while closing file ";
err_msg << current_input_file_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
input_file_fd_ = -1;
current_input_file_ = "";
}
}
+171
View File
@@ -0,0 +1,171 @@
#include <cstring>
#include "ImageAssembler.hpp"
using namespace std;
using namespace core_buffer;
ImageAssembler::ImageAssembler(const size_t n_modules) :
n_modules_(n_modules),
image_buffer_slot_n_bytes_(BUFFER_BLOCK_SIZE * MODULE_N_BYTES * n_modules_)
{
image_buffer_ = new char[IA_N_SLOTS * image_buffer_slot_n_bytes_];
meta_buffer_ = new ImageMetadataBlock[IA_N_SLOTS];
frame_meta_buffer_ =
new ModuleFrame[IA_N_SLOTS * n_modules * BUFFER_BLOCK_SIZE];
buffer_status_ = new atomic_int[IA_N_SLOTS];
for (size_t i=0; i<IA_N_SLOTS; i++) {
free_slot(i);
}
}
ImageAssembler::~ImageAssembler()
{
delete[] image_buffer_;
delete[] meta_buffer_;
}
bool ImageAssembler::is_slot_free(const uint64_t bunch_id)
{
auto slot_id = bunch_id % IA_N_SLOTS;
return buffer_status_[slot_id].load() > 0;
}
bool ImageAssembler::is_slot_full(const uint64_t bunch_id)
{
auto slot_id = bunch_id % IA_N_SLOTS;
return buffer_status_[slot_id].load() == 0;
}
size_t ImageAssembler::get_data_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_;
size_t module_i_offset = i_module * MODULE_N_BYTES;
return slot_i_offset + module_i_offset;
}
size_t ImageAssembler::get_metadata_offset(
const uint64_t slot_id, const int i_module)
{
size_t n_metadata_in_slot = n_modules_ * BUFFER_BLOCK_SIZE;
size_t slot_m_offset = slot_id * n_metadata_in_slot;
size_t module_m_offset = i_module;
return slot_m_offset + module_m_offset;
}
void ImageAssembler::process(
const uint64_t bunch_id,
const int i_module,
const BufferBinaryBlock* block_buffer)
{
const auto slot_id = bunch_id % IA_N_SLOTS;
auto image_offset = get_data_offset(slot_id, i_module);
const auto image_offset_step = MODULE_N_BYTES * n_modules_;
auto meta_offset = get_metadata_offset(slot_id, i_module);
const auto meta_offset_step = n_modules_;
for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
memcpy(
&(frame_meta_buffer_[meta_offset]),
&(block_buffer->frame[i_pulse].metadata),
sizeof(ModuleFrame));
meta_offset += meta_offset_step;
memcpy(
image_buffer_ + image_offset,
&(block_buffer->frame[i_pulse].data[0]),
MODULE_N_BYTES);
image_offset += image_offset_step;
}
buffer_status_[slot_id].fetch_sub(1);
}
void ImageAssembler::free_slot(const uint64_t bunch_id)
{
auto slot_id = bunch_id % IA_N_SLOTS;
buffer_status_[slot_id].store(n_modules_);
}
ImageMetadataBlock* ImageAssembler::get_metadata_buffer(const uint64_t bunch_id)
{
const auto slot_id = bunch_id % IA_N_SLOTS;
auto& image_pulse_id = meta_buffer_[slot_id].pulse_id;
auto& image_frame_index = meta_buffer_[slot_id].frame_index;
auto& image_daq_rec = meta_buffer_[slot_id].daq_rec;
auto& image_is_good_frame = meta_buffer_[slot_id].is_good_image;
auto meta_offset = get_metadata_offset(slot_id, 0);
const auto meta_offset_step = 1;
uint64_t start_pulse_id = bunch_id * BUFFER_BLOCK_SIZE;
meta_buffer_[slot_id].block_start_pulse_id = start_pulse_id;
uint64_t stop_pulse_id = start_pulse_id + BUFFER_BLOCK_SIZE - 1;
meta_buffer_[slot_id].block_stop_pulse_id = stop_pulse_id;
for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
auto is_pulse_init = false;
image_is_good_frame[i_pulse] = 1;
for (size_t i_module=0; i_module < n_modules_; i_module++) {
auto& frame_meta = frame_meta_buffer_[meta_offset];
auto is_good_frame =
frame_meta.n_received_packets == JF_N_PACKETS_PER_FRAME;
if (!is_good_frame) {
image_is_good_frame[i_pulse] = 0;
// TODO: Update meta_offset only once in the loop.
meta_offset += meta_offset_step;
continue;
}
if (!is_pulse_init) {
image_pulse_id[i_pulse] = frame_meta.pulse_id;
image_frame_index[i_pulse] = frame_meta.frame_index;
image_daq_rec[i_pulse] = frame_meta.daq_rec;
is_pulse_init = true;
}
if (image_is_good_frame[i_pulse] == 1) {
if (image_pulse_id[i_pulse] != frame_meta.pulse_id) {
image_is_good_frame[i_pulse] = 0;
}
if (image_frame_index[i_pulse] != frame_meta.frame_index) {
image_is_good_frame[i_pulse] = 0;
}
if (image_daq_rec[i_pulse] != frame_meta.daq_rec) {
image_is_good_frame[i_pulse] = 0;
}
if (frame_meta.n_received_packets != JF_N_PACKETS_PER_FRAME) {
image_is_good_frame[i_pulse] = 0;
}
}
meta_offset += meta_offset_step;
}
}
return &(meta_buffer_[slot_id]);
}
char* ImageAssembler::get_data_buffer(const uint64_t bunch_id)
{
auto slot_id = bunch_id % IA_N_SLOTS;
return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_);
}
+205
View File
@@ -0,0 +1,205 @@
#include "JFH5Writer.hpp"
#include <sstream>
#include <cstring>
#include <hdf5_hl.h>
//extern "C"
//{
// #include "H5DOpublic.h"
// #include <bitshuffle/bshuf_h5filter.h>
//}
using namespace std;
using namespace core_buffer;
JFH5Writer::JFH5Writer(const std::string& output_file,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const size_t n_modules) :
start_pulse_id_(start_pulse_id),
stop_pulse_id_(stop_pulse_id),
n_modules_(n_modules),
n_images_(stop_pulse_id - start_pulse_id + 1),
current_write_index_(0)
{
// bshuf_register_h5filter();
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
hsize_t image_dataset_dims[3] =
{n_images_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
// auto chunk_size = min(n_images_, BUFFER_BLOCK_SIZE);
// hsize_t image_dataset_chunking[3] =
// {chunk_size, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
// H5::DSetCreatPropList image_dataset_properties;
// image_dataset_properties.setChunk(3, image_dataset_chunking);
// // block_size, compression type
// uint compression_prop[] =
// {MODULE_N_PIXELS, //block size
// BSHUF_H5_COMPRESS_LZ4}; // Compression type
//
// H5Pset_filter(image_dataset_properties.getId(),
// BSHUF_H5FILTER,
// H5Z_FLAG_MANDATORY,
// 2,
// &(compression_prop[0]));
image_dataset_ = file_.createDataSet(
"image",
H5::PredType::NATIVE_UINT16,
image_dataspace);
b_pulse_id_ = new uint64_t[n_images_];
b_frame_index_= new uint64_t[n_images_];
b_daq_rec_ = new uint32_t[n_images_];
b_is_good_frame_ = new uint8_t[n_images_];
}
JFH5Writer::~JFH5Writer()
{
close_file();
delete[] b_pulse_id_;
delete[] b_frame_index_;
delete[] b_daq_rec_;
delete[] b_is_good_frame_;
}
void JFH5Writer::close_file()
{
if (file_.getId() == -1) {
return;
}
image_dataset_.close();
hsize_t b_m_dims[2] = {n_images_, 1};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t f_m_dims[] = {n_images_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
auto pulse_id_dataset = file_.createDataSet(
"pulse_id",
H5::PredType::NATIVE_UINT64,
f_m_space);
pulse_id_dataset.write(
b_pulse_id_, H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
pulse_id_dataset.close();
auto frame_index_dataset = file_.createDataSet(
"frame_index",
H5::PredType::NATIVE_UINT64,
f_m_space);
frame_index_dataset.write(
b_frame_index_, H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
frame_index_dataset.close();
auto daq_rec_dataset = file_.createDataSet(
"daq_rec",
H5::PredType::NATIVE_UINT32,
f_m_space);
daq_rec_dataset.write(
b_daq_rec_, H5::PredType::NATIVE_UINT32,
b_m_space, f_m_space);
daq_rec_dataset.close();
auto is_good_frame_dataset = file_.createDataSet(
"is_good_frame",
H5::PredType::NATIVE_UINT8,
f_m_space);
is_good_frame_dataset.write(
b_is_good_frame_, H5::PredType::NATIVE_UINT8,
b_m_space, f_m_space);
is_good_frame_dataset.close();
file_.close();
}
void JFH5Writer::write(
const ImageMetadataBlock* metadata, const char* data)
{
size_t n_images_offset = 0;
if (start_pulse_id_ > metadata->block_start_pulse_id) {
n_images_offset = start_pulse_id_ - metadata->block_start_pulse_id;
}
if (n_images_offset > BUFFER_BLOCK_SIZE) {
throw runtime_error("Received unexpected block for start_pulse_id.");
}
size_t n_images_to_copy = BUFFER_BLOCK_SIZE - n_images_offset;
if (stop_pulse_id_ < metadata->block_stop_pulse_id) {
n_images_to_copy -= metadata->block_stop_pulse_id - stop_pulse_id_;
}
if (n_images_to_copy < 1) {
throw runtime_error("Received unexpected block for stop_pulse_id.");
}
hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
H5::DataSpace b_i_space(3, b_i_dims);
hsize_t b_i_count[] = {n_images_to_copy,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
hsize_t b_i_start[] = {n_images_offset, 0, 0};
b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start);
hsize_t f_i_dims[3] = {n_images_,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
H5::DataSpace f_i_space(3, f_i_dims);
hsize_t f_i_count[] = {n_images_to_copy,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
hsize_t f_i_start[] = {current_write_index_, 0, 0};
f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start);
image_dataset_.write(
data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space);
// pulse_id
{
auto b_current_ptr = b_pulse_id_ + current_write_index_;
memcpy(b_current_ptr,
&(metadata->pulse_id[n_images_offset]),
sizeof(uint64_t) * n_images_to_copy);
}
// frame_index
{
auto b_current_ptr = b_frame_index_ + current_write_index_;
memcpy(b_current_ptr,
&(metadata->frame_index[n_images_offset]),
sizeof(uint64_t) * n_images_to_copy);
}
// daq_rec
{
auto b_current_ptr = b_daq_rec_ + current_write_index_;
memcpy(b_current_ptr,
&(metadata->daq_rec[n_images_offset]),
sizeof(uint32_t) * n_images_to_copy);
}
// is_good_frame
{
auto b_current_ptr = b_is_good_frame_ + current_write_index_;
memcpy(b_current_ptr,
&(metadata->is_good_image[n_images_offset]),
sizeof(uint8_t) * n_images_to_copy);
}
current_write_index_ += n_images_to_copy;
}
-162
View File
@@ -1,162 +0,0 @@
#include "WriterH5Writer.hpp"
#include <sstream>
//extern "C"
//{
// #include "H5DOpublic.h"
// #include <bitshuffle/bshuf_h5filter.h>
//}
using namespace std;
using namespace core_buffer;
WriterH5Writer::WriterH5Writer(
const string& output_file,
const size_t n_frames,
const size_t n_modules) :
n_frames_(n_frames),
n_modules_(n_modules),
current_write_index_(0)
{
// bshuf_register_h5filter();
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
hsize_t image_dataset_dims[3] =
{n_frames_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
hsize_t image_dataset_chunking[3] =
{1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DSetCreatPropList image_dataset_properties;
image_dataset_properties.setChunk(3, image_dataset_chunking);
// // block_size, compression type
// uint compression_prop[] =
// {MODULE_N_PIXELS, //block size
// BSHUF_H5_COMPRESS_LZ4}; // Compression type
//
// H5Pset_filter(image_dataset_properties.getId(),
// BSHUF_H5FILTER,
// H5Z_FLAG_MANDATORY,
// 2,
// &(compression_prop[0]));
image_dataset_ = file_.createDataSet(
"image",
H5::PredType::NATIVE_UINT16,
image_dataspace,
image_dataset_properties);
hsize_t metadata_dataset_dims[] = {n_frames_, 1};
H5::DataSpace metadata_dataspace(2, metadata_dataset_dims);
// Chunk cannot be larger than n_frames.
auto metadata_chunk_size = WRITER_METADATA_CHUNK_N_IMAGES;
if (n_frames < metadata_chunk_size) {
metadata_chunk_size = n_frames;
}
hsize_t metadata_dataset_chunking[] = {metadata_chunk_size, 1};
H5::DSetCreatPropList metadata_dataset_properties;
metadata_dataset_properties.setChunk(2, metadata_dataset_chunking);
pulse_id_dataset_ = file_.createDataSet(
"pulse_id",
H5::PredType::NATIVE_UINT64,
metadata_dataspace,
metadata_dataset_properties);
frame_index_dataset_ = file_.createDataSet(
"frame_index",
H5::PredType::NATIVE_UINT64,
metadata_dataspace,
metadata_dataset_properties);
daq_rec_dataset_ = file_.createDataSet(
"daq_rec",
H5::PredType::NATIVE_UINT32,
metadata_dataspace,
metadata_dataset_properties);
is_good_frame_dataset_ = file_.createDataSet(
"is_good_frame",
H5::PredType::NATIVE_UINT8,
metadata_dataspace,
metadata_dataset_properties);
}
WriterH5Writer::~WriterH5Writer()
{
close_file();
}
void WriterH5Writer::close_file()
{
image_dataset_.close();
pulse_id_dataset_.close();
frame_index_dataset_.close();
daq_rec_dataset_.close();
is_good_frame_dataset_.close();
file_.close();
}
void WriterH5Writer::write(
const ImageMetadataBuffer* metadata, const char* data)
{
auto n_images_in_buffer = metadata->n_images;
hsize_t b_i_dims[3] = {
n_images_in_buffer,
MODULE_Y_SIZE*n_modules_,
MODULE_X_SIZE};
H5::DataSpace b_i_space(3, b_i_dims);
hsize_t f_i_dims[3] = {n_frames_,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
H5::DataSpace f_i_space(3, f_i_dims);
hsize_t i_count[] = {n_images_in_buffer,
MODULE_Y_SIZE*n_modules_,
MODULE_X_SIZE};
hsize_t i_start[] = {current_write_index_, 0, 0};
f_i_space.selectHyperslab(H5S_SELECT_SET, i_count, i_start);
image_dataset_.write(
data, H5::PredType::NATIVE_UINT16,
b_i_space, f_i_space);
hsize_t b_m_dims[2] = {n_images_in_buffer, 1};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t f_m_dims[] = {n_frames_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
hsize_t meta_count[] = {n_images_in_buffer, 1};
hsize_t meta_start[] = {current_write_index_, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start);
pulse_id_dataset_.write(
&(metadata->pulse_id), H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
frame_index_dataset_.write(
&(metadata->frame_index), H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
daq_rec_dataset_.write(
&(metadata->daq_rec), H5::PredType::NATIVE_UINT32,
b_m_space, f_m_space);
is_good_frame_dataset_.write(
&(metadata->is_good_image), H5::PredType::NATIVE_UINT8,
b_m_space, f_m_space);
current_write_index_ += n_images_in_buffer;
}
-138
View File
@@ -1,138 +0,0 @@
#include "WriterZmqReceiver.hpp"
#include "zmq.h"
#include "date.h"
#include <chrono>
#include <sstream>
using namespace std;
using namespace core_buffer;
WriterZmqReceiver::WriterZmqReceiver(
void *ctx,
const string &ipc_prefix,
const size_t n_modules,
const uint64_t stop_pulse_id_) :
n_modules_(n_modules),
sockets_(n_modules),
stop_pulse_id_(stop_pulse_id_)
{
for (size_t i = 0; i < n_modules; i++) {
sockets_[i] = zmq_socket(ctx, ZMQ_PULL);
int rcvhwm = WRITER_RCVHWM;
if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm,
sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger,
sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
stringstream ipc_addr;
ipc_addr << ipc_prefix << i;
const auto ipc = ipc_addr.str();
if (zmq_connect(sockets_[i], ipc.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
}
}
WriterZmqReceiver::~WriterZmqReceiver()
{
for (size_t i = 0; i < n_modules_; i++) {
zmq_close(sockets_[i]);
}
}
void WriterZmqReceiver::get_next_buffer(
const uint64_t start_pulse_id,
ImageMetadataBuffer* i_meta,
char* image_buffer)
{
auto n_images_in_buffer = WRITER_DATA_CACHE_N_IMAGES;
auto images_left = stop_pulse_id_ - start_pulse_id + 1;
if (images_left < n_images_in_buffer) {
n_images_in_buffer = images_left;
}
i_meta->n_images = (uint16_t)n_images_in_buffer;
for (uint64_t i_pulse=0; i_pulse<n_images_in_buffer; i_pulse++) {
auto pulse_id = start_pulse_id + i_pulse;
bool pulse_id_initialized = false;
i_meta->pulse_id[i_pulse] = pulse_id;
i_meta->is_good_image[i_pulse] = 1;
i_meta->frame_index[i_pulse] = 0;
i_meta->daq_rec[i_pulse] = 0;
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto n_bytes_metadata = zmq_recv(
sockets_[i_module], &f_meta_, sizeof(f_meta_), 0);
if (n_bytes_metadata != sizeof(f_meta_)) {
throw runtime_error("Wrong number of metadata bytes.");
}
if (f_meta_.pulse_id == 0) {
i_meta->is_good_image[i_pulse] = 0;
} else {
if (!pulse_id_initialized) {
// Init the image metadata with the first valid frame.
pulse_id_initialized = true;
i_meta->frame_index[i_pulse] = f_meta_.frame_index;
i_meta->daq_rec[i_pulse] = f_meta_.daq_rec;
}
if (f_meta_.pulse_id != i_meta->pulse_id[i_pulse]) {
stringstream err_msg;
err_msg << "[WriterZmqReceiver::get_next_buffer]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << pulse_id;
err_msg << " received ";
err_msg << f_meta_.pulse_id;
err_msg << " from i_module " << i_module << endl;
throw runtime_error(err_msg.str());
}
}
// Once the image is not good, we don't care to re-flag it.
if (i_meta->is_good_image[i_pulse] == 1) {
if (f_meta_.frame_index != i_meta->frame_index[i_pulse]) {
i_meta->is_good_image[i_pulse] = 0;
}
if (f_meta_.daq_rec != i_meta->daq_rec[i_pulse]) {
i_meta->is_good_image[i_pulse] = 0;
}
if (f_meta_.n_received_packets != JF_N_PACKETS_PER_FRAME) {
i_meta->is_good_image[i_pulse] = 0;
}
}
auto pulse_offset = i_pulse * n_modules_ * MODULE_N_BYTES ;
auto module_offset = i_module * MODULE_N_BYTES;
auto n_bytes_image = zmq_recv(
sockets_[i_module],
(image_buffer + pulse_offset + module_offset),
MODULE_N_BYTES, 0);
if (n_bytes_image != MODULE_N_BYTES) {
throw runtime_error("Wrong number of data bytes.");
}
}
}
}
+88 -93
View File
@@ -1,75 +1,72 @@
#include <iostream>
#include <stdexcept>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <jungfrau.hpp>
#include <thread>
#include <chrono>
#include "WriterH5Writer.hpp"
#include <FastQueue.hpp>
#include <cstring>
#include "date.h"
#include "zmq.h"
#include "buffer_config.hpp"
#include "bitshuffle/bitshuffle.h"
#include "WriterZmqReceiver.hpp"
#include "JFH5Writer.hpp"
#include "ImageAssembler.hpp"
#include "BufferBinaryReader.hpp"
using namespace std;
using namespace core_buffer;
using namespace chrono;
void receive_replay(
void* ctx,
const string ipc_prefix,
const size_t n_modules,
FastQueue<ImageMetadataBuffer>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
void read_buffer(
const string device,
const string channel_name,
const int i_module,
const vector<uint64_t>& buffer_blocks,
ImageAssembler& image_assembler)
{
try {
WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id);
BufferBinaryReader block_reader(device, channel_name);
auto block_buffer = new BufferBinaryBlock();
uint64_t current_pulse_id=start_pulse_id;
for (uint64_t block_id:buffer_blocks) {
// "<= stop_pulse_id" because we include the last pulse_id.
while(current_pulse_id<=stop_pulse_id) {
int slot_id;
while((slot_id = queue.reserve()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
auto metadata = queue.get_metadata_buffer(slot_id);
auto buffer = queue.get_data_buffer(slot_id);
receiver.get_next_buffer(
current_pulse_id, metadata, buffer);
queue.commit();
current_pulse_id += metadata->n_images;
while(!image_assembler.is_slot_free(block_id)) {
this_thread::sleep_for(chrono::milliseconds(
WRITER_IMAGE_ASSEMBLER_RETRY_MS));
}
} catch (const std::exception& e) {
using namespace date;
using namespace chrono;
auto start_time = steady_clock::now();
cout << "[" << system_clock::now() << "]";
cout << "[sf_writer::receive_replay]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
block_reader.get_block(block_id, block_buffer);
throw;
auto end_time = steady_clock::now();
uint64_t read_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
start_time = steady_clock::now();
image_assembler.process(block_id, i_module, block_buffer);
end_time = steady_clock::now();
uint64_t compose_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
cout << "sf_replay:avg_read_us ";
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
cout << "sf_replay:avg_assemble_us ";
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
}
delete block_buffer;
}
int main (int argc, char *argv[])
{
if (argc != 5) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]";
cout << "Usage: sf_writer [output_file] [device]";
cout << " [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\tipc_id: Unique identifier for ipc." << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tdevice: Name of detector." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
@@ -77,77 +74,75 @@ int main (int argc, char *argv[])
exit(-1);
}
const string ipc_id = string(argv[1]);
string output_file = string(argv[2]);
string output_file = string(argv[1]);
const string device = string(argv[2]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[3]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]);
size_t n_modules = 32;
FastQueue<ImageMetadataBuffer> queue(
MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES,
WRITER_FASTQUEUE_N_SLOTS);
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
ImageAssembler image_assembler(n_modules);
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
thread replay_receive_thread(receive_replay,
ctx, ipc_base, n_modules,
ref(queue), start_pulse_id, stop_pulse_id);
// Generate list of buffer blocks that need to be loaded.
std::vector<uint64_t> buffer_blocks;
for (uint64_t curr_block=start_block;
curr_block<=stop_block;
curr_block++) {
buffer_blocks.push_back(curr_block);
}
size_t n_frames = stop_pulse_id - start_pulse_id + 1;
WriterH5Writer writer(output_file, n_frames, n_modules);
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
auto current_pulse_id = start_pulse_id;
// "<= stop_pulse_id" because we include the last pulse_id.
while (current_pulse_id <= stop_pulse_id) {
// TODO: Very ugly. Fix.
string channel_name = "M";
if (i_module < 10) {
channel_name += "0";
}
channel_name += to_string(i_module);
auto start_time = chrono::steady_clock::now();
reading_threads.emplace_back(
read_buffer,
device,
channel_name,
i_module,
ref(buffer_blocks),
ref(image_assembler));
}
int slot_id;
while((slot_id = queue.read()) == -1) {
JFH5Writer writer(output_file, start_pulse_id, stop_pulse_id, n_modules);
for (uint64_t block_id:buffer_blocks) {
while(!image_assembler.is_slot_full(block_id)) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
WRITER_IMAGE_ASSEMBLER_RETRY_MS));
}
auto metadata = queue.get_metadata_buffer(slot_id);
auto data = queue.get_data_buffer(slot_id);
auto metadata = image_assembler.get_metadata_buffer(block_id);
auto data = image_assembler.get_data_buffer(block_id);
// Verify that all pulse_ids are correct.
for (int i=0; i<metadata->n_images; i++) {
if (metadata->pulse_id[i] != current_pulse_id) {
throw runtime_error("Wrong pulse id from receiver thread.");
}
current_pulse_id++;
}
auto end_time = chrono::steady_clock::now();
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
start_time = chrono::steady_clock::now();
auto start_time = steady_clock::now();
writer.write(metadata, data);
end_time = chrono::steady_clock::now();
auto end_time = steady_clock::now();
auto write_us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
queue.release();
image_assembler.free_slot(block_id);
auto avg_read_us = read_us_duration / metadata->n_images;;
auto avg_write_us = write_us_duration / metadata->n_images;;
cout << "sf_writer:avg_read_us " << avg_read_us;
cout << " sf_writer:avg_write_us " << avg_write_us;
cout << endl;
cout << "sf_writer:avg_write_us ";
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
}
writer.close_file();
for (auto& reading_thread :reading_threads) {
if (reading_thread.joinable()) {
reading_thread.join();
}
}
//wait till receive thread is finished
replay_receive_thread.join();
return 0;
}
+1 -11
View File
@@ -3,18 +3,8 @@ add_executable(sf-writer-tests main.cpp)
target_link_libraries(sf-writer-tests
sf-writer-lib
hdf5
hdf5_hl
hdf5_cpp
zmq
gtest
)
add_executable(sf-writer-recv manual/test_sf_writer_recv.cpp)
set_target_properties(sf-writer-recv PROPERTIES OUTPUT_NAME sf_writer)
set_target_properties(sf-writer-recv PROPERTIES EXCLUDE_FROM_ALL TRUE)
target_link_libraries(sf-writer-recv
sf-writer-lib
zmq
hdf5
hdf5_cpp
pthread
)
+1 -1
View File
@@ -1,6 +1,6 @@
#include "gtest/gtest.h"
#include "test_WriterZmqReceiver.cpp"
#include "test_WriterH5Writer.cpp"
#include "test_ImageAssembler.cpp"
using namespace std;
@@ -1,130 +0,0 @@
#include <iostream>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <thread>
#include <chrono>
#include "WriterH5Writer.hpp"
#include <FastQueue.hpp>
#include <cstring>
#include "date.h"
#include "bitshuffle/bitshuffle.h"
#include "WriterZmqReceiver.hpp"
using namespace std;
using namespace core_buffer;
void receive_replay(
void* ctx,
const string ipc_prefix,
const size_t n_modules,
FastQueue<ImageMetadataBuffer>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
{
try {
WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id);
int slot_id;
while((slot_id = queue.reserve()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
uint64_t pulse_id = start_pulse_id;
// "<= stop_pulse_id" because we include the last pulse_id.
while(pulse_id <= stop_pulse_id) {
auto start_time = chrono::steady_clock::now();
auto image_metadata = queue.get_metadata_buffer(slot_id);
auto image_buffer = queue.get_data_buffer(slot_id);
receiver.get_next_buffer(pulse_id, image_metadata, image_buffer);
pulse_id += image_metadata->n_images;
auto end_time = chrono::steady_clock::now();
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
cout << "sf_writer::avg_read_us ";
cout << read_us_duration / image_metadata->n_images << endl;
}
queue.commit();
} catch (const std::exception& e) {
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_writer::receive_replay]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
}
int main (int argc, char *argv[])
{
if (argc != 5) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\tipc_id: Unique identifier for ipc." << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
const string ipc_id = string(argv[1]);
string output_file = string(argv[2]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[3]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]);
size_t n_modules = 32;
FastQueue<ImageMetadataBuffer> queue(
MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES,
WRITER_FASTQUEUE_N_SLOTS);
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
receive_replay(
ctx, ipc_base, n_modules,queue, start_pulse_id, stop_pulse_id);
//
//
// auto current_pulse_id = start_pulse_id;
// // "<= stop_pulse_id" because we include the last pulse_id.
// while (current_pulse_id <= stop_pulse_id) {
//
// int slot_id;
// while((slot_id = queue.read()) == -1) {
// this_thread::sleep_for(chrono::milliseconds(
// RB_READ_RETRY_INTERVAL_MS));
// }
//
// auto metadata = queue.get_metadata_buffer(slot_id);
//
// for (int i_pulse=0; i_pulse < metadata->n_images; i_pulse++) {
// cout << "Written image " << metadata->pulse_id[i_pulse] << endl;
//
// }
//
// queue.release();
// current_pulse_id += metadata->n_images;
// }
//
// //wait till receive thread is finished
// replay_receive_thread.join();
return 0;
}
+90
View File
@@ -0,0 +1,90 @@
#include <memory>
#include "ImageAssembler.hpp"
#include "gtest/gtest.h"
using namespace std;
using namespace core_buffer;
TEST(ImageAssembler, basic_interaction)
{
size_t n_modules = 3;
uint64_t bunch_id = 0;
ImageAssembler assembler(n_modules);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
auto buffer_block = make_unique<BufferBinaryBlock>();
auto buffer_ptr = buffer_block.get();
for (size_t i_module=0; i_module < n_modules; i_module++) {
assembler.process(bunch_id, i_module, buffer_ptr);
}
ASSERT_EQ(assembler.is_slot_full(bunch_id), true);
auto metadata = assembler.get_metadata_buffer(bunch_id);
auto data = assembler.get_data_buffer(bunch_id);
assembler.free_slot(bunch_id);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
ASSERT_EQ(metadata->is_good_image[i_pulse], 0);
}
}
TEST(ImageAssembler, reconstruction)
{
size_t n_modules = 2;
uint64_t bunch_id = 0;
ImageAssembler assembler(n_modules);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
auto buffer_block = make_unique<BufferBinaryBlock>();
auto buffer_ptr = buffer_block.get();
for (size_t i_module=0; i_module < n_modules; i_module++) {
for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
auto& frame_meta = buffer_block->frame[i_pulse].metadata;
frame_meta.pulse_id = 100 + i_pulse;
frame_meta.daq_rec = 1000 + i_pulse;
frame_meta.frame_index = 10000 + i_pulse;
frame_meta.n_received_packets = JF_N_PACKETS_PER_FRAME;
for (size_t i_pixel=0; i_pixel < MODULE_N_PIXELS; i_pixel++) {
buffer_block->frame[i_pulse].data[i_pixel] =
(i_module * 10) + (i_pixel % 100);
}
}
assembler.process(bunch_id, i_module, buffer_ptr);
}
ASSERT_EQ(assembler.is_slot_full(bunch_id), true);
auto metadata = assembler.get_metadata_buffer(bunch_id);
auto data = assembler.get_data_buffer(bunch_id);
assembler.free_slot(bunch_id);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
ASSERT_EQ(metadata->block_start_pulse_id, 0);
ASSERT_EQ(metadata->block_stop_pulse_id, BUFFER_BLOCK_SIZE-1);
for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
ASSERT_EQ(metadata->pulse_id[i_pulse], 100 + i_pulse);
ASSERT_EQ(metadata->daq_rec[i_pulse], 1000 + i_pulse);
ASSERT_EQ(metadata->frame_index[i_pulse], 10000 + i_pulse);
ASSERT_EQ(metadata->is_good_image[i_pulse], 1);
for (size_t i_module=0; i_module < n_modules; i_module++) {
// TODO: Check assembled image.
}
}
}
+94 -74
View File
@@ -1,92 +1,112 @@
#include <memory>
#include "WriterH5Writer.hpp"
#include "JFH5Writer.hpp"
#include "gtest/gtest.h"
#include "bitshuffle/bitshuffle.h"
using namespace std;
using namespace core_buffer;
TEST(WriterH5Writer, basic_interaction)
{
size_t n_modules = 2;
size_t n_frames = 5;
uint64_t start_pulse_id = 1;
uint64_t stop_pulse_id = 5;
auto data = make_unique<char[]>(n_modules*MODULE_N_BYTES);
auto metadata = make_shared<ImageMetadataBuffer>();
auto data = make_unique<char[]>(n_modules*MODULE_N_BYTES*BUFFER_BLOCK_SIZE);
auto metadata = make_shared<ImageMetadataBlock>();
// Needed by writer.
metadata->data_n_bytes[0] = 500;
metadata->n_pulses_in_buffer = 1;
metadata->block_start_pulse_id = 0;
metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1;
WriterH5Writer writer("ignore.h5", n_frames, n_modules);
JFH5Writer writer("ignore.h5", start_pulse_id, stop_pulse_id, n_modules);
writer.write(metadata.get(), data.get());
writer.close_file();
}
TEST(WriterH5Writer, test_compression)
TEST(WriterH5Writer, test_writing)
{
// size_t n_modules = 2;
// size_t n_frames = 2;
//
// auto comp_buffer_size = bshuf_compress_lz4_bound(
// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// auto f_raw_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
// auto f_comp_buffer = make_unique<char[]>(comp_buffer_size);
//
// auto i_comp_buffer = make_unique<char[]>(
// (comp_buffer_size * n_modules) + BSHUF_LZ4_HEADER_BYTES);
// auto i_raw_buffer = make_unique<uint16_t[]>(
// MODULE_N_PIXELS * n_modules * n_frames);
//
// bshuf_write_uint64_BE(&i_comp_buffer[0],
// MODULE_N_BYTES * n_modules);
// bshuf_write_uint32_BE(&i_comp_buffer[8],
// MODULE_N_PIXELS * PIXEL_N_BYTES);
//
// size_t total_compressed_size = BSHUF_LZ4_HEADER_BYTES;
// for (int i_module=0; i_module<n_modules; i_module++) {
//
// for (size_t i=0; i<MODULE_N_PIXELS; i++) {
// f_raw_buffer[i] = (uint16_t)((i % 100) + (i_module*100));
// }
//
// auto compressed_size = bshuf_compress_lz4(
// f_raw_buffer.get(), f_comp_buffer.get(),
// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// memcpy((i_comp_buffer.get() + total_compressed_size),
// f_comp_buffer.get(),
// compressed_size);
//
// total_compressed_size += compressed_size;
// }
//
// auto metadata = make_shared<ImageMetadata>();
// metadata->data_n_bytes = total_compressed_size;
//
// metadata->is_good_frame = 1;
// metadata->frame_index = 3;
// metadata->pulse_id = 3;
// metadata->daq_rec = 3;
//
// auto result = bshuf_decompress_lz4(
// &i_comp_buffer[12], &i_raw_buffer[0],
// MODULE_N_PIXELS*n_modules, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// WriterH5Writer writer("ignore.h5", n_frames, n_modules);
// writer.write(metadata.get(), &i_comp_buffer[0]);
// writer.close_file();
//
// H5::H5File reader("ignore.h5", H5F_ACC_RDONLY);
// auto image_dataset = reader.openDataSet("image");
// image_dataset.read(&i_raw_buffer[0], H5::PredType::NATIVE_UINT16);
//
// for (int i_module=0; i_module<n_modules; i_module++) {
// for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
// size_t offset = (i_module * MODULE_N_PIXELS) + i_pixel;
// ASSERT_EQ(i_raw_buffer[offset],
// (uint16_t)((i_pixel % 100) + (i_module*100)));
// }
// }
size_t n_modules = 2;
uint64_t start_pulse_id = 5;
uint64_t stop_pulse_id = 10;
auto n_images = stop_pulse_id - start_pulse_id + 1;
auto metadata = make_shared<ImageMetadataBlock>();
metadata->block_start_pulse_id = 0;
metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1;
for (uint64_t pulse_id=start_pulse_id;
pulse_id<=stop_pulse_id;
pulse_id++) {
metadata->pulse_id[pulse_id] = pulse_id;
metadata->frame_index[pulse_id] = pulse_id + 10;
metadata->daq_rec[pulse_id] = pulse_id + 100;
metadata->is_good_image[pulse_id] = 1;
}
auto image_buffer = make_unique<uint16_t[]>(
MODULE_N_PIXELS * n_modules * BUFFER_BLOCK_SIZE);
for (int i_block=0; i_block<=BUFFER_BLOCK_SIZE; i_block++) {
for (int i_module=0; i_module<n_modules; i_module++) {
auto offset = i_block * MODULE_N_PIXELS;
offset += i_module * MODULE_N_PIXELS;
for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
image_buffer[offset + i_pixel] = i_pixel % 100;
}
}
}
// The writer closes the file on destruction.
{
JFH5Writer writer(
"ignore.h5", start_pulse_id, stop_pulse_id, n_modules);
writer.write(metadata.get(), (char*)(&image_buffer[0]));
}
H5::H5File reader("ignore.h5", H5F_ACC_RDONLY);
auto image_dataset = reader.openDataSet("image");
image_dataset.read(&image_buffer[0], H5::PredType::NATIVE_UINT16);
for (int i_image=0; i_image < n_images; i_image++) {
for (int i_module=0; i_module<n_modules; i_module++) {
auto offset = i_image * MODULE_N_PIXELS;
offset += i_module * MODULE_N_PIXELS;
for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
ASSERT_EQ(image_buffer[offset + i_pixel], i_pixel % 100);
}
}
}
auto pulse_id_data = make_unique<uint64_t[]>(n_images);
auto pulse_id_dataset = reader.openDataSet("pulse_id");
pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64);
auto frame_index_data = make_unique<uint64_t[]>(n_images);
auto frame_index_dataset = reader.openDataSet("frame_index");
frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64);
auto daq_rec_data = make_unique<uint32_t[]>(n_images);
auto daq_rec_dataset = reader.openDataSet("daq_rec");
daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32);
auto is_good_frame_data = make_unique<uint8_t[]>(n_images);
auto is_good_frame_dataset = reader.openDataSet("is_good_frame");
is_good_frame_dataset.read(
&is_good_frame_data[0], H5::PredType::NATIVE_UINT8);
for (uint64_t pulse_id=start_pulse_id;
pulse_id<=stop_pulse_id;
pulse_id++) {
ASSERT_EQ(pulse_id_data[pulse_id - start_pulse_id], pulse_id);
ASSERT_EQ(frame_index_data[pulse_id - start_pulse_id], pulse_id + 10);
ASSERT_EQ(daq_rec_data[pulse_id - start_pulse_id], pulse_id + 100);
ASSERT_EQ(is_good_frame_data[pulse_id - start_pulse_id], 1);
}
}
-80
View File
@@ -1,80 +0,0 @@
#include <gtest/gtest.h>
#include "WriterZmqReceiver.hpp"
#include "bitshuffle/bitshuffle.h"
#include <thread>
#include <sstream>
#include "buffer_config.hpp"
#include "zmq.h"
using namespace std;
using namespace core_buffer;
TEST(WriterZmqReceiver, basic_test)
{
size_t n_modules = 4;
uint64_t pulse_id = 12345;
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
void* sockets[n_modules];
for (size_t i = 0; i < n_modules; i++) {
sockets[i] = zmq_socket(ctx, ZMQ_PUSH);
int linger = 0;
if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger,
sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
stringstream ipc_addr;
ipc_addr << REPLAY_STREAM_IPC_URL << i;
const auto ipc = ipc_addr.str();
if (zmq_bind(sockets[i], ipc.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
}
this_thread::sleep_for(chrono::milliseconds(100));
WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules);
this_thread::sleep_for(chrono::milliseconds(100));
size_t compressed_frame_size = 5000;
auto frame_buffer = make_unique<char[]>(compressed_frame_size);
ImageMetadata image_metadata;
auto compress_size = bshuf_compress_lz4_bound(
MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
auto image_buffer = make_unique<char[]>(compress_size * n_modules);
for (size_t i = 0; i < n_modules; i++) {
ReplayModuleFrameBuffer frame_metadata;
frame_metadata.metadata.pulse_id = pulse_id;
frame_metadata.metadata.frame_index = pulse_id + 100;
frame_metadata.metadata.n_received_packets = 128;
frame_metadata.metadata.daq_rec = 4;
frame_metadata.is_frame_present = 1;
frame_metadata.data_n_bytes = compressed_frame_size;
zmq_send(sockets[i],
&frame_metadata,
sizeof(ReplayModuleFrameBuffer),
ZMQ_SNDMORE);
zmq_send(sockets[i],
(char*)(frame_buffer.get()),
compressed_frame_size,
0);
}
receiver.get_next_buffer(pulse_id, &image_metadata, image_buffer.get());
EXPECT_EQ(pulse_id, image_metadata.pulse_id);
EXPECT_EQ(image_metadata.is_good_frame, 1);
EXPECT_EQ(image_metadata.daq_rec, 4);
EXPECT_EQ(image_metadata.data_n_bytes,
5000*n_modules);
// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES);
}