diff --git a/CMakeLists.txt b/CMakeLists.txt index 63c25dc..659699f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.12) project(sf_daq_buffer) set(CMAKE_CXX_STANDARD 17) -set (LIB_CPP_H5_WRITER_VERSION "1.0.0") +set (SF_DAQ_BUFFER_VERSION "1.0.0") set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) @@ -34,4 +34,5 @@ add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") -add_subdirectory("jf-live-writer") +#add_subdirectory("jf-live-writer") + diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index e3ffcef..d24802c 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -42,6 +42,10 @@ namespace BufferUtils const std::string& module_name, const uint64_t pulse_id); + std::string get_image_filename( + const std::string& detector_folder, + const uint64_t pulse_id); + std::size_t get_file_frame_index(const uint64_t pulse_id); void update_latest_file( diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index 7fd100b..bd71471 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -15,6 +15,24 @@ using namespace std; using namespace buffer_config; +string BufferUtils::get_image_filename( + const std::string& detector_folder, + const uint64_t pulse_id) +{ + uint64_t data_folder = pulse_id / buffer_config::FOLDER_MOD; + data_folder *= buffer_config::FOLDER_MOD; + + uint64_t data_file = pulse_id / buffer_config::FILE_MOD; + data_file *= buffer_config::FILE_MOD; + + stringstream folder; + folder << detector_folder << "/"; + folder << data_folder << "/"; + folder << data_file << buffer_config::FILE_EXTENSION; + + return folder.str(); +} + string BufferUtils::get_filename( const std::string& detector_folder, const std::string& module_name, @@ -151,16 +169,6 @@ BufferUtils::DetectorConfig BufferUtils::read_json_config( config_parameters["detector_name"].GetString(), config_parameters["n_modules"].GetInt(), config_parameters["start_udp_port"].GetInt(), - // config_parameters["buffer_folder"].GetString() - "" - }; - - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [BufferUtils::read_json_config] Config:"; - cout << det_config; - cout << endl; - #endif - return det_config; + config_parameters["buffer_folder"].GetString(), + }; } diff --git a/jf-live-writer/CMakeLists.txt b/jf-live-writer/CMakeLists.txt index 1faa611..596db50 100644 --- a/jf-live-writer/CMakeLists.txt +++ b/jf-live-writer/CMakeLists.txt @@ -1,11 +1,20 @@ +find_package(MPI REQUIRED) +# Because of openmpi. +add_definitions(-DOMPI_SKIP_MPICXX) + file(GLOB SOURCES src/*.cpp) add_library(jf-live-writer-lib STATIC ${SOURCES}) -target_include_directories(jf-live-writer-lib PUBLIC include/) +target_include_directories(jf-live-writer-lib + PUBLIC include/ + SYSTEM ${MPI_INCLUDE_PATH}) + target_link_libraries(jf-live-writer-lib external - core-buffer-lib) + core-buffer-lib + ${MPI_LIBRARIES} + ) add_executable(jf-live-writer src/main.cpp) @@ -20,11 +29,11 @@ set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV} target_link_libraries(jf-live-writer jf-live-writer-lib - sf-writer-lib + zmq hdf5 hdf5_hl hdf5_cpp - pthread + rt ) enable_testing() diff --git a/jf-live-writer/README.md b/jf-live-writer/README.md new file mode 100644 index 0000000..ce4349c --- /dev/null +++ b/jf-live-writer/README.md @@ -0,0 +1,13 @@ +# jf-live-writer + +## Install PHDF5 manually +``` +wget https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-1.12.0/src/hdf5-1.12.0.tar.gz +tar -xzf hdf5-1.12.0.tar.gz +cd hdf5-1.10.7 +./configure --enable-parallel +make install +sudo ln -v -s `pwd`/hdf5/lib/* /usr/lib64/ +sudo ln -v -s `pwd`/hdf5/include/* /usr/include/ +``` + diff --git a/jf-live-writer/include/ImageBinaryWriter.hpp b/jf-live-writer/include/ImageBinaryWriter.hpp deleted file mode 100644 index 8e6ebfb..0000000 --- a/jf-live-writer/include/ImageBinaryWriter.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef BINARYWRITER_HPP -#define BINARYWRITER_HPP - -#include - -#include "formats.hpp" - -class ImageBinaryWriter { - - const size_t MAX_FILE_BYTES = - buffer_config::FILE_MOD * sizeof(BufferBinaryFormat); - - const std::string detector_folder_; - std::string latest_filename_; - - std::string current_output_filename_; - int output_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - - -public: - ImageBinaryWriter(const std::string& detector_folder); - - virtual ~ImageBinaryWriter(); - - void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer); - -}; - - -#endif //BINARYWRITER_HPP diff --git a/jf-live-writer/include/JFH5Writer.hpp b/jf-live-writer/include/JFH5Writer.hpp new file mode 100644 index 0000000..3a681f2 --- /dev/null +++ b/jf-live-writer/include/JFH5Writer.hpp @@ -0,0 +1,58 @@ +#ifndef SFWRITER_HPP +#define SFWRITER_HPP + +#include +#include +#include +#include + +extern "C" { + #include +} + +class JFH5Writer { + + const std::string root_folder_; + const std::string detector_name_; + + static const int64_t NO_RUN_ID; + + // Run specific variables. + int64_t current_run_id_ = NO_RUN_ID; + uint32_t image_y_size_ = 0; + uint32_t image_x_size_ = 0; + uint32_t bits_per_pixel_ = 0; + + // Open file specific variables. + hid_t file_id_ = -1; + hid_t image_dataset_id_ = -1; + hid_t pulse_dataset_id_= -1; + hid_t frame_dataset_id_ = -1; + hid_t daq_rec_dataset_id_ = -1; + hid_t is_good_dataset_id_ = -1; + + hid_t get_datatype(const int bits_per_pixel); + void open_file(const std::string& output_file, const uint32_t n_images); + void close_file(); + +public: + JFH5Writer(const BufferUtils::DetectorConfig config); + ~JFH5Writer(); + + void open_run(const int64_t run_id, + const uint32_t n_images, + const uint32_t image_y_size, + const uint32_t image_x_size, + const uint32_t bits_per_pixel); + void close_run(); + + void write_data(const int64_t run_id, + const uint32_t index, + const char* data); + + void write_meta(const int64_t run_id, + const uint32_t index, + const ImageMetadata& meta); +}; + +#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/WriterStats.hpp b/jf-live-writer/include/WriterStats.hpp index cb023a7..775a46f 100644 --- a/jf-live-writer/include/WriterStats.hpp +++ b/jf-live-writer/include/WriterStats.hpp @@ -8,7 +8,8 @@ class WriterStats { const std::string detector_name_; - size_t stats_modulo_; + const size_t stats_modulo_; + const size_t image_n_bytes_; int image_counter_; uint32_t total_buffer_write_us_; @@ -21,7 +22,8 @@ class WriterStats { public: WriterStats( const std::string &detector_name, - const size_t stats_modulo); + const size_t stats_modulo, + const size_t image_n_bytes); void start_image_write(); void end_image_write(); }; diff --git a/jf-live-writer/include/broker_format.hpp b/jf-live-writer/include/broker_format.hpp new file mode 100644 index 0000000..ecdca44 --- /dev/null +++ b/jf-live-writer/include/broker_format.hpp @@ -0,0 +1,23 @@ +#ifndef SF_DAQ_BUFFER_BROKER_FORMAT_HPP +#define SF_DAQ_BUFFER_BROKER_FORMAT_HPP + +#include "formats.hpp" + +const static uint8_t OP_START = 1; +const static uint8_t OP_END = 2; + +#pragma pack(push) +#pragma pack(1) +struct StoreStream { + int64_t run_id; + uint32_t i_image; + uint32_t n_images; + uint32_t image_y_size; + uint32_t image_x_size; + uint32_t op_code; + uint32_t bits_per_pixel; + + ImageMetadata image_metadata; +}; +#pragma pack(pop) +#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp index 76d9b05..d47c531 100644 --- a/jf-live-writer/include/live_writer_config.hpp +++ b/jf-live-writer/include/live_writer_config.hpp @@ -4,4 +4,6 @@ namespace live_writer_config { // N of IO threads to receive data from modules. const int LIVE_ZMQ_IO_THREADS = 1; + + const std::string OUTPUT_FOLDER_SYMLINK = "OUTPUT/" } \ No newline at end of file diff --git a/jf-live-writer/src/ImageBinaryWriter.cpp b/jf-live-writer/src/ImageBinaryWriter.cpp deleted file mode 100644 index c3f70f5..0000000 --- a/jf-live-writer/src/ImageBinaryWriter.cpp +++ /dev/null @@ -1,165 +0,0 @@ -#include "ImageBinaryWriter.hpp" - -#include -#include -#include "date.h" -#include -#include -#include -#include - -#include "BufferUtils.hpp" - -using namespace std; - -ImageBinaryWriter::ImageBinaryWriter( - const string& detector_folder): - detector_folder_(detector_folder), - latest_filename_(detector_folder + "/LATEST"), - current_output_filename_(""), - output_file_fd_(-1) -{ -} - -ImageBinaryWriter::~ImageBinaryWriter() -{ - close_current_file(); -} - -void ImageBinaryWriter::write( - const uint64_t pulse_id, - const BufferBinaryFormat* buffer) -{ - auto current_frame_file = - BufferUtils::get_filename(detector_folder_, module_name_, pulse_id); - - if (current_frame_file != current_output_filename_) { - open_file(current_frame_file); - } - - size_t n_bytes_offset = - BufferUtils::get_file_frame_index(pulse_id) * - sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::write]"; - err_msg << " Error while lseek on file "; - err_msg << current_output_filename_; - err_msg << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat)); - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::write]"; - err_msg << " Error while writing to file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void ImageBinaryWriter::open_file(const std::string& filename) -{ - close_current_file(); - - BufferUtils::create_destination_folder(filename); - - output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, - S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); - if (output_file_fd_ < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BinaryWriter::open_file]"; - err_msg << " Cannot create file "; - err_msg << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - // TODO: Remove context if test successful. - - /** Setting the buffer file size in advance to try to lower the number of - metadata updates on GPFS. */ - { - // TODO: Try instead to use fallocate. - if (lseek(output_file_fd_, MAX_FILE_BYTES, SEEK_SET) < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::open_file]"; - err_msg << " Error while lseek on end of file "; - err_msg << current_output_filename_; - err_msg << " for MAX_FILE_BYTES "; - err_msg << MAX_FILE_BYTES << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - const uint8_t mark = 255; - if(::write(output_file_fd_, &mark, sizeof(mark)) != sizeof(mark)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::open_file]"; - err_msg << " Error while writing to file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - } - - - current_output_filename_ = filename; -} - -void ImageBinaryWriter::close_current_file() -{ - if (output_file_fd_ != -1) { - if (close(output_file_fd_) < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::close_current_file]"; - err_msg << " Error while closing file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - output_file_fd_ = -1; - - BufferUtils::update_latest_file( - latest_filename_, current_output_filename_); - - current_output_filename_ = ""; - } -} \ No newline at end of file diff --git a/jf-live-writer/src/JFH5Writer.cpp b/jf-live-writer/src/JFH5Writer.cpp new file mode 100644 index 0000000..a9d2a52 --- /dev/null +++ b/jf-live-writer/src/JFH5Writer.cpp @@ -0,0 +1,234 @@ +#include "JFH5Writer.hpp" + +#include +#include +#include + + +#include "live_writer_config.hpp" +#include "buffer_config.hpp" +#include "formats.hpp" + +extern "C" +{ + #include "H5DOpublic.h" + #include +} + +using namespace std; +using namespace buffer_config; +using namespace live_writer_config; + +JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig config): + root_folder_(config.buffer_folder), + detector_name_(config.detector_name), +{ +} + +JFH5Writer::~JFH5Writer() +{ + close_file(); +} + +void JFH5Writer::open_run(const int64_t run_id, + const uint32_t n_images, + const uint32_t image_y_size, + const uint32_t image_x_size, + const uint32_t bits_per_pixel) +{ + close_run(); + + const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK; + // TODO: Maybe add leading zeros to filename? + const string output_file = output_folder + to_string(run_id) + ".h5"; + + current_run_id_ = run_id; + image_y_size_ = image_y_size; + image_x_size_ = image_x_size; + bits_per_pixel_ = bits_per_pixel; + + open_file(output_file, n_images); +} + +void JFH5Writer::close_run() +{ + close_file(); + + current_run_id_ = NO_RUN_ID; + image_y_size_ = 0; + image_x_size_ = 0; + bits_per_pixel_ = 0; +} + +void JFH5Writer::open_file(const string& output_file, const uint32_t n_images) +{ + // Create file + auto fcpl_id = H5Pcreate(H5P_FILE_ACCESS); + if (fcpl_id == -1) { + throw runtime_error("Error in file access property list."); + } + + if (H5Pset_fapl_mpio(fcpl_id, MPI_COMM_WORLD, MPI_INFO_NULL) < 0) { + throw runtime_error("Cannot set mpio to property list."); + } + + file_id_ = H5Fcreate( + output_file.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, fcpl_id); + if (file_id_ < 0) { + throw runtime_error("Cannot create output file."); + } + + H5Pclose(fcpl_id); + + // Create group + auto data_group_id = H5Gcreate(file_id_, detector_name_.c_str(), + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (data_group_id < 0) { + throw runtime_error("Cannot create data group."); + } + + // Create image dataset. + auto dcpl_id = H5Pcreate(H5P_DATASET_CREATE); + if (dcpl_id < 0) { + throw runtime_error("Error in creating dataset create property list."); + } + + hsize_t image_dataset_chunking[] = {1, image_y_size_, image_x_size_}; + if (H5Pset_chunk(dcpl_id, 3, image_dataset_chunking) < 0) { + throw runtime_error("Cannot set image dataset chunking."); + } + + if (H5Pset_fill_time(dcpl_id, H5D_FILL_TIME_NEVER) < 0) { + throw runtime_error("Cannot set image dataset fill time."); + } + + if (H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_EARLY) < 0) { + throw runtime_error("Cannot set image dataset allocation time."); + } + + hsize_t image_dataset_dims[] = {n_images, image_y_size_, image_x_size_}; + auto image_space_id = H5Screate_simple(3, image_dataset_dims, NULL); + if (image_space_id < 0) { + throw runtime_error("Cannot create image dataset space."); + } + + // TODO: Enable compression. +// bshuf_register_h5filter(); +// uint filter_prop[] = {PIXEL_N_BYTES, BSHUF_H5_COMPRESS_LZ4}; +// if (H5Pset_filter(dcpl_id, BSHUF_H5FILTER, H5Z_FLAG_MANDATORY, +// 2, filter_prop) < 0) { +// throw runtime_error("Cannot set compression filter on dataset."); +// } + + image_dataset_id_ = H5Dcreate( + data_group_id, "data", get_datatype(bits_per_pixel_), + image_space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT); + if (image_dataset_id_ < 0) { + throw runtime_error("Cannot create image dataset."); + } + + // Create metadata datasets. + hsize_t meta_dataset_dims[] = {n_images}; + auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, NULL); + if (meta_space_id < 0) { + throw runtime_error("Cannot create meta dataset space."); + } + + auto create_meta_dataset = [&](string name, hid_t data_type) { + auto dataset_id = H5Dcreate( + data_group_id, name.c_str(), data_type, meta_space_id, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (dataset_id < 0) { + throw runtime_error("Cannot create " + name + " dataset."); + } + + return dataset_id; + }; + + pulse_dataset_id_ = create_meta_dataset("pulse_id", H5T_NATIVE_UINT64); + frame_dataset_id_ = create_meta_dataset("frame_index", H5T_NATIVE_UINT64); + daq_rec_dataset_id_ = create_meta_dataset("daq_rec", H5T_NATIVE_UINT32); + is_good_dataset_id_ = create_meta_dataset("is_good_frame", H5T_NATIVE_UINT8); + + H5Sclose(meta_space_id); + H5Sclose(image_space_id); + H5Pclose(dcpl_id); + H5Gclose(data_group_id); +} + +void JFH5Writer::close_file() +{ + if (file_id_ < 0) { + return; + } + + H5Dclose(image_dataset_id_); + image_dataset_id_ = -1; + + H5Dclose(pulse_dataset_id_); + pulse_dataset_id_ = -1; + + H5Dclose(frame_dataset_id_); + frame_dataset_id_ = -1; + + H5Dclose(daq_rec_dataset_id_); + daq_rec_dataset_id_ = -1; + + H5Dclose(is_good_dataset_id_); + is_good_dataset_id_ = -1; + + H5Fclose(file_id_); + file_id_ = -1; +} + +void JFH5Writer::write_data( + const int64_t run_id, const uint32_t index, const char* data) +{ + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); + } + +// hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// H5::DataSpace b_i_space(3, b_i_dims); +// hsize_t b_i_count[] = {n_images_to_copy, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// hsize_t b_i_start[] = {n_images_offset, 0, 0}; +// b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); +// +// hsize_t f_i_dims[3] = {n_images_, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// H5::DataSpace f_i_space(3, f_i_dims); +// hsize_t f_i_count[] = {n_images_to_copy, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// hsize_t f_i_start[] = {data_write_index_, 0, 0}; +// f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); +// +// image_dataset_.write( +// data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space); + + hsize_t offset[] = {data_write_index_, 0, 0}; + size_t data_offset = i_image * MODULE_N_BYTES * n_modules_; + + H5DOwrite_chunk( + image_dataset_.getId(), + H5P_DEFAULT, + 0, + offset, + MODULE_N_BYTES * n_modules_, + data + data_offset); +} + +void JFH5Writer::write_meta( + const int64_t run_id, const uint32_t index, const ImageMetadata& meta) +{ + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); + } + + +} diff --git a/jf-live-writer/src/WriterStats.cpp b/jf-live-writer/src/WriterStats.cpp index 1d67947..51a0922 100644 --- a/jf-live-writer/src/WriterStats.cpp +++ b/jf-live-writer/src/WriterStats.cpp @@ -6,9 +6,11 @@ using namespace chrono; WriterStats::WriterStats( const string& detector_name, - const size_t stats_modulo) : + const size_t stats_modulo, + const size_t image_n_bytes) : detector_name_(detector_name), - stats_modulo_(stats_modulo) + stats_modulo_(stats_modulo), + image_n_bytes_(image_n_bytes) { reset_counters(); } @@ -43,19 +45,25 @@ void WriterStats::end_image_write() void WriterStats::print_stats() { - float avg_buffer_write_us = total_buffer_write_us_ / image_counter_; + const float avg_buffer_write_us = total_buffer_write_us_ / image_counter_; - uint64_t timestamp = time_point_cast( + const uint64_t timestamp = time_point_cast( system_clock::now()).time_since_epoch().count(); + const uint64_t avg_throughput = + // bytes -> megabytes + (image_n_bytes_ / 1024 / 1024) / + // micro seconds -> seconds + (avg_buffer_write_us * 1000 * 1000); + // Output in InfluxDB line protocol cout << "jf_buffer_writer"; cout << ",detector_name=" << detector_name_; cout << " "; cout << "n_written_images=" << image_counter_ << "i"; cout << " ,avg_buffer_write_us=" << avg_buffer_write_us; - cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; - cout << " "; + cout << " ,max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " ,avg_throughput=" << avg_throughput; cout << timestamp; cout << endl; } diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 1b912f8..4d77d29 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -4,7 +4,10 @@ #include #include #include "live_writer_config.hpp" -#include "../../jf-buffer-writer/include/BufferStats.hpp" +#include "WriterStats.hpp" +#include "broker_format.hpp" +#include +#include using namespace std; @@ -16,31 +19,70 @@ int main (int argc, char *argv[]) if (argc != 3) { cout << endl; cout << "Usage: jf_live_writer [detector_json_filename]" - " [stream_name]" << endl; + " [bits_per_pixel]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbits_per_pixel: Number of bits in each pixel." << endl; cout << endl; exit(-1); } - const auto stream_name = string(argv[2]); - auto config = BufferUtils::read_json_config(string(argv[1])); + auto const config = BufferUtils::read_json_config(string(argv[1])); + auto const bits_per_pixel = atoi(argv[2]); + + MPI_Init(NULL, NULL); + + int n_writers; + MPI_Comm_size(MPI_COMM_WORLD, &n_writers); + + int i_writer; + MPI_Comm_size(MPI_COMM_WORLD, &i_writer); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); auto receiver = BufferUtils::connect_socket( - ctx, config.detector_name, "assembler"); + ctx, config.detector_name, "broker-agent"); RamBuffer ram_buffer(config.detector_name, config.n_modules); - BufferStats stats(config.detector_name, stream_name, STATS_MODULO); - ImageMetadata meta; + const uint64_t image_n_bytes = + config.image_y_size * config.image_x_size * bits_per_pixel; + + JFH5Writer writer(config); + WriterStats stats(config.detector_name, STATS_MODULO, image_n_bytes); + + StoreStream meta = {}; while (true) { zmq_recv(receiver, &meta, sizeof(meta), 0); - char* data = ram_buffer.read_image(meta.pulse_id); - sender.send(meta, data); + if (meta.op_code == OP_START) { + writer.open_run(meta.run_id, + meta.n_images, + meta.image_y_size, + meta.image_x_size, + meta.bits_per_pixel); + continue; + } - stats.record_stats(meta); + if (meta.op_code == OP_END) { + writer.close_run(); + continue; + } + + // Fair distribution of images among writers. + if (meta.i_image % n_writers == i_writer) { + char* data = ram_buffer.read_image(meta.image_metadata.pulse_id); + + stats.start_image_write(); + writer.write_data(meta.run_id, meta.i_image, data); + stats.end_image_write(); + } + + // Only the first instance writes metadata. + if (i_writer == 0) { + writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata); + } } + + MPI_Finalize(); } diff --git a/jf-live-writer/test/CMakeLists.txt b/jf-live-writer/test/CMakeLists.txt index 1079fc2..8f806b0 100644 --- a/jf-live-writer/test/CMakeLists.txt +++ b/jf-live-writer/test/CMakeLists.txt @@ -2,9 +2,7 @@ add_executable(jf-live-writer-tests main.cpp) target_link_libraries(jf-live-writer-tests jf-live-writer-lib - hdf5 - hdf5_hl - hdf5_cpp zmq + rt gtest ) diff --git a/jf-live-writer/test/main.cpp b/jf-live-writer/test/main.cpp index 69b7f53..e819294 100644 --- a/jf-live-writer/test/main.cpp +++ b/jf-live-writer/test/main.cpp @@ -1,7 +1,5 @@ #include "gtest/gtest.h" -#include "test_BinaryReader.cpp" - using namespace std; int main(int argc, char **argv) { diff --git a/jf-live-writer/test/test_BinaryReader.cpp b/jf-live-writer/test/test_BinaryReader.cpp deleted file mode 100644 index cc30157..0000000 --- a/jf-live-writer/test/test_BinaryReader.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include -#include "gtest/gtest.h" - -TEST(BinaryReader, basic_interaction) { - // TODO: Write some real tests. - auto detector_folder = "test_device"; - auto module_name = "M1"; - BinaryReader reader(detector_folder, module_name); -} - diff --git a/scripts/JF01-buffer-worker.sh b/scripts/JF01-buffer-worker.sh index 311471a..a7ae3db 100644 --- a/scripts/JF01-buffer-worker.sh +++ b/scripts/JF01-buffer-worker.sh @@ -8,9 +8,6 @@ fi M=$1 -# Add ourselves to the user cpuset. -# echo $$ > /sys/fs/cgroup/cpuset/user/tasks - coreAssociatedBuffer=(12 12 12) initialUDPport=50010 @@ -18,4 +15,4 @@ port=$((${initialUDPport}+10#${M})) DETECTOR=JF01T03V01 N_MODULES=3 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF01-buffer-worker@.service b/scripts/JF01-buffer-worker@.service index b8b6a97..afe7806 100644 --- a/scripts/JF01-buffer-worker@.service +++ b/scripts/JF01-buffer-worker@.service @@ -8,7 +8,7 @@ BindsTo=JF01-buffer.service PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh %i +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF01-buffer-worker.sh %i TimeoutStartSec=10 RestartSec=10 diff --git a/scripts/JF01-buffer.service b/scripts/JF01-buffer.service index efdc14d..335d8c2 100644 --- a/scripts/JF01-buffer.service +++ b/scripts/JF01-buffer.service @@ -3,7 +3,7 @@ Description=All UDP-buffer instances of JF01 [Service] Type=oneshot -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-buffer-worker.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF01-buffer-worker.sh RemainAfterExit=yes [Install] diff --git a/scripts/JF01-stream.service b/scripts/JF01-stream.service index 79e7b1b..58a7d69 100644 --- a/scripts/JF01-stream.service +++ b/scripts/JF01-stream.service @@ -5,10 +5,10 @@ Description=stream service (to streamvis and live analysis) of JF01 PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF01-stream.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF01-stream.sh TimeoutStartSec=10 Restart=on-failure -RestartSec=10 +RestartSec=1 [Install] WantedBy=multi-user.target diff --git a/scripts/JF01-stream.sh b/scripts/JF01-stream.sh index c7d1724..6341508 100644 --- a/scripts/JF01-stream.sh +++ b/scripts/JF01-stream.sh @@ -1,5 +1,9 @@ #!/bin/bash coreAssociated="24" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF01.json +SERVICE=JF01-stream -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF01.json +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF01-vis.service b/scripts/JF01-vis.service new file mode 100644 index 0000000..1eb1ac3 --- /dev/null +++ b/scripts/JF01-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF01 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF01-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF01-vis.sh b/scripts/JF01-vis.sh new file mode 100644 index 0000000..d74b688 --- /dev/null +++ b/scripts/JF01-vis.sh @@ -0,0 +1,19 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5001 +PORT_BACKEND=9001 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +taskset -c 17,18 \ +streamvis bernina --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-bernina:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title 1p5M + diff --git a/scripts/JF02-buffer-worker.sh b/scripts/JF02-buffer-worker.sh index a9fc4c0..1bb35e0 100644 --- a/scripts/JF02-buffer-worker.sh +++ b/scripts/JF02-buffer-worker.sh @@ -8,14 +8,22 @@ fi M=$1 -# Add ourselves to the user cpuset. -# echo $$ > /sys/fs/cgroup/cpuset/user/tasks +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` -coreAssociatedBuffer=(39 39 39 40 40 40 41 41 41) +case ${H} in +'sf-daq-4') + coreAssociatedBuffer=(11 12 13 14 15 16 17 18 19) + ;; +'sf-daq-8') + coreAssociatedBuffer=(1 1 1 2 2 2 3 3 3) + ;; +*) + CORES=(25 25 26 26 27 27 28 28 29) +esac initialUDPport=50020 port=$((${initialUDPport}+10#${M})) DETECTOR=JF02T09V02 N_MODULES=9 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF02-buffer-worker@.service b/scripts/JF02-buffer-worker@.service index b175012..55031e4 100644 --- a/scripts/JF02-buffer-worker@.service +++ b/scripts/JF02-buffer-worker@.service @@ -8,9 +8,9 @@ BindsTo=JF02-buffer.service PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF02-buffer-worker.sh %i +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF02-buffer-worker.sh %i TimeoutStartSec=10 -RestartSec=10 +RestartSec=1 [Install] WantedBy=JF02-buffer.service diff --git a/scripts/JF02-buffer.service b/scripts/JF02-buffer.service index a3b442c..4302f4a 100644 --- a/scripts/JF02-buffer.service +++ b/scripts/JF02-buffer.service @@ -3,7 +3,7 @@ Description=All UDP-buffer instances of JF02 [Service] Type=oneshot -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF02-buffer-worker.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF02-buffer-worker.sh RemainAfterExit=yes [Install] diff --git a/scripts/JF02-stream.service b/scripts/JF02-stream.service index 27aaaa0..0cd706e 100644 --- a/scripts/JF02-stream.service +++ b/scripts/JF02-stream.service @@ -5,10 +5,10 @@ Description=stream service (to streamvis and live analysis) of JF02 PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF02-stream.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF02-stream.sh TimeoutStartSec=10 Restart=on-failure -RestartSec=10 +RestartSec=1 [Install] WantedBy=multi-user.target diff --git a/scripts/JF02-stream.sh b/scripts/JF02-stream.sh index f150b3e..7912e94 100644 --- a/scripts/JF02-stream.sh +++ b/scripts/JF02-stream.sh @@ -1,5 +1,22 @@ #!/bin/bash -coreAssociated="17,18,19" +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +case ${H} in +'sf-daq-4') + coreAssociated="33,34,35" + CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF02.daq4.json + ;; +'sf-daq-8') + coreAssociated="20,21" + CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF02.json + ;; +*) + coreAssociated="12" +esac + +SERVICE=JF02-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF02.json diff --git a/scripts/JF02-vis.service b/scripts/JF02-vis.service new file mode 100644 index 0000000..a80fe66 --- /dev/null +++ b/scripts/JF02-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF02 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF02-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF02-vis.sh b/scripts/JF02-vis.sh new file mode 100644 index 0000000..f13f7a8 --- /dev/null +++ b/scripts/JF02-vis.sh @@ -0,0 +1,31 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5002 +PORT_BACKEND=9002 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + + +case ${H} in +'sf-daq-4') + CORES='36,37' + ;; +'sf-daq-8') + CORES='33,34' + ;; +*) + CORES='2' +esac + +taskset -c ${CORES} \ +streamvis alvra --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-alvra:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title 4p5M_Alvra + diff --git a/scripts/JF03-buffer-worker.sh b/scripts/JF03-buffer-worker.sh new file mode 100644 index 0000000..00ecbc4 --- /dev/null +++ b/scripts/JF03-buffer-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF03-buffer-worker@00 + exit +fi + +M=$1 + +coreAssociatedBuffer=(12) + +initialUDPport=50030 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF03T01V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF03-buffer-worker@.service b/scripts/JF03-buffer-worker@.service new file mode 100644 index 0000000..758aed9 --- /dev/null +++ b/scripts/JF03-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF03 UDP2buffer worker instance as a service, instance %i +Requires=JF03-buffer.service +Before=JF03-buffer.service +BindsTo=JF03-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF03-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=1 + +[Install] +WantedBy=JF03-buffer.service diff --git a/scripts/JF03-buffer.service b/scripts/JF03-buffer.service new file mode 100644 index 0000000..7e87546 --- /dev/null +++ b/scripts/JF03-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF03 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF03-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF03-stream.service b/scripts/JF03-stream.service new file mode 100644 index 0000000..0cce437 --- /dev/null +++ b/scripts/JF03-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF03 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF03-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF03-stream.sh b/scripts/JF03-stream.sh new file mode 100644 index 0000000..62ef433 --- /dev/null +++ b/scripts/JF03-stream.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +coreAssociated="27" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF03.json +SERVICE=JF03-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF03-vis.service b/scripts/JF03-vis.service new file mode 100644 index 0000000..636e27c --- /dev/null +++ b/scripts/JF03-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF03 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF03-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF03-vis.sh b/scripts/JF03-vis.sh new file mode 100644 index 0000000..9fa8dd4 --- /dev/null +++ b/scripts/JF03-vis.sh @@ -0,0 +1,21 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5003 +PORT_BACKEND=9003 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +CORES=39 + +taskset -c ${CORES} \ +streamvis bernina --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-bernina:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title I0 + diff --git a/scripts/JF04-buffer-worker.sh b/scripts/JF04-buffer-worker.sh new file mode 100644 index 0000000..e0be303 --- /dev/null +++ b/scripts/JF04-buffer-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF04-buffer-worker@00 + exit +fi + +M=$1 + +coreAssociatedBuffer=(12) + +initialUDPport=50040 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF04T01V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF04-buffer-worker@.service b/scripts/JF04-buffer-worker@.service new file mode 100644 index 0000000..b913054 --- /dev/null +++ b/scripts/JF04-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF04 UDP2buffer worker instance as a service, instance %i +Requires=JF04-buffer.service +Before=JF04-buffer.service +BindsTo=JF04-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF04-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=1 + +[Install] +WantedBy=JF04-buffer.service diff --git a/scripts/JF04-buffer.service b/scripts/JF04-buffer.service new file mode 100644 index 0000000..6bfdadb --- /dev/null +++ b/scripts/JF04-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF04 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF04-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF04-stream.service b/scripts/JF04-stream.service new file mode 100644 index 0000000..8ffd01a --- /dev/null +++ b/scripts/JF04-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF04 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF04-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF04-stream.sh b/scripts/JF04-stream.sh new file mode 100644 index 0000000..e8690d2 --- /dev/null +++ b/scripts/JF04-stream.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +coreAssociated="27" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF04.json +SERVICE=JF04-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF04-vis.service b/scripts/JF04-vis.service new file mode 100644 index 0000000..78ae2bb --- /dev/null +++ b/scripts/JF04-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF04 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF04-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF04-vis.sh b/scripts/JF04-vis.sh new file mode 100644 index 0000000..72478a8 --- /dev/null +++ b/scripts/JF04-vis.sh @@ -0,0 +1,21 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5004 +PORT_BACKEND=9004 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +CORES=39 + +taskset -c ${CORES} \ +streamvis bernina --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-bernina:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title Fluorescence + diff --git a/scripts/JF06-buffer-worker.sh b/scripts/JF06-buffer-worker.sh index 731ac88..3124952 100644 --- a/scripts/JF06-buffer-worker.sh +++ b/scripts/JF06-buffer-worker.sh @@ -8,14 +8,11 @@ fi M=$1 -# Add ourselves to the user cpuset. -# echo $$ > /sys/fs/cgroup/cpuset/user/tasks - -coreAssociatedBuffer=(22 22 23 23 24 24 25 25 26 26 27 27 28 28 29 29 30 30 31 31 32 32 33 33 34 34 35 35 36 36 37 37) +coreAssociatedBuffer=(4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9 10 10 10 10 11 11 11 11) initialUDPport=50060 port=$((${initialUDPport}+10#${M})) DETECTOR=JF06T32V02 N_MODULES=32 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF06-buffer-worker@.service b/scripts/JF06-buffer-worker@.service index 44ab481..c0380f1 100644 --- a/scripts/JF06-buffer-worker@.service +++ b/scripts/JF06-buffer-worker@.service @@ -8,9 +8,9 @@ BindsTo=JF06-buffer.service PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06-buffer-worker.sh %i +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06-buffer-worker.sh %i TimeoutStartSec=10 -RestartSec=10 +RestartSec=1 [Install] WantedBy=JF06-buffer.service diff --git a/scripts/JF06-buffer.service b/scripts/JF06-buffer.service index eba84ca..4195d8e 100644 --- a/scripts/JF06-buffer.service +++ b/scripts/JF06-buffer.service @@ -3,7 +3,7 @@ Description=All UDP-buffer instances of JF06 [Service] Type=oneshot -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06-buffer-worker.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06-buffer-worker.sh RemainAfterExit=yes [Install] diff --git a/scripts/JF06-stream.service b/scripts/JF06-stream.service index 9f6d3b6..d4fdd80 100644 --- a/scripts/JF06-stream.service +++ b/scripts/JF06-stream.service @@ -5,10 +5,10 @@ Description=stream service (to streamvis and live analysis) of JF06 PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06-stream.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06-stream.sh TimeoutStartSec=10 Restart=on-failure -RestartSec=10 +RestartSec=1 [Install] WantedBy=multi-user.target diff --git a/scripts/JF06-stream.sh b/scripts/JF06-stream.sh index 6d39268..03d6adf 100644 --- a/scripts/JF06-stream.sh +++ b/scripts/JF06-stream.sh @@ -1,5 +1,9 @@ #!/bin/bash -coreAssociated="13,14,15,16" +coreAssociated="22,23,24" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF06.json +SERVICE=JF06-stream -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF06.json +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF06-vis.service b/scripts/JF06-vis.service new file mode 100644 index 0000000..ff6b509 --- /dev/null +++ b/scripts/JF06-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF06 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF06-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF06-vis.sh b/scripts/JF06-vis.sh new file mode 100644 index 0000000..a3050f9 --- /dev/null +++ b/scripts/JF06-vis.sh @@ -0,0 +1,20 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5006 +PORT_BACKEND=9006 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +CORES="35,36" + +taskset -c ${CORES} \ +streamvis default16m --allow-websocket-origin=${H}:${PORT} --allow-websocket-origin=sf-daq-alvra:${PORT} \ +--port=${PORT} --address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title 16M_Jungfrau_Alvra + diff --git a/scripts/JF06_4M-buffer-worker.sh b/scripts/JF06_4M-buffer-worker.sh index 6a6e924..846f5eb 100644 --- a/scripts/JF06_4M-buffer-worker.sh +++ b/scripts/JF06_4M-buffer-worker.sh @@ -2,20 +2,17 @@ if [ $# != 1 ] then - systemctl start JF06_4M-buffer-worker@{00..31} + systemctl start JF06_4M-buffer-worker@{00..07} exit fi M=$1 -# Add ourselves to the user cpuset. -# echo $$ > /sys/fs/cgroup/cpuset/user/tasks - -coreAssociatedBuffer=(22 23 24 25 26 27 28 29) +coreAssociatedBuffer=(4 5 6 7 8 9 10 11) initialUDPport=50060 port=$((${initialUDPport}+10#${M})) -DETECTOR=JF06T08V01 +DETECTOR=JF06T08V02 N_MODULES=8 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF06_4M-buffer-worker@.service b/scripts/JF06_4M-buffer-worker@.service index f83f2c0..9960018 100644 --- a/scripts/JF06_4M-buffer-worker@.service +++ b/scripts/JF06_4M-buffer-worker@.service @@ -8,9 +8,9 @@ BindsTo=JF06_4M-buffer.service PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06_4M-buffer-worker.sh %i +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06_4M-buffer-worker.sh %i TimeoutStartSec=10 -RestartSec=10 +RestartSec=1 [Install] WantedBy=JF06_4M-buffer.service diff --git a/scripts/JF06_4M-buffer.service b/scripts/JF06_4M-buffer.service index a3116e8..41d5610 100644 --- a/scripts/JF06_4M-buffer.service +++ b/scripts/JF06_4M-buffer.service @@ -3,7 +3,7 @@ Description=All UDP-buffer instances of JF06(4M mode) [Service] Type=oneshot -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06_4M-buffer-worker.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06_4M-buffer-worker.sh RemainAfterExit=yes [Install] diff --git a/scripts/JF06_4M-stream.service b/scripts/JF06_4M-stream.service index 4526f8f..bd332ab 100644 --- a/scripts/JF06_4M-stream.service +++ b/scripts/JF06_4M-stream.service @@ -5,10 +5,10 @@ Description=stream service (to streamvis and live analysis) of JF06 (4M mode) PermissionsStartOnly=true Type=idle User=root -ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF06_4M-stream.sh +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF06_4M-stream.sh TimeoutStartSec=10 Restart=on-failure -RestartSec=10 +RestartSec=1 [Install] WantedBy=multi-user.target diff --git a/scripts/JF06_4M-stream.sh b/scripts/JF06_4M-stream.sh index c5359b5..7896ef8 100644 --- a/scripts/JF06_4M-stream.sh +++ b/scripts/JF06_4M-stream.sh @@ -1,5 +1,10 @@ #!/bin/bash -coreAssociated="13,14,15,16" +coreAssociated="20,21" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF06_4M.daq8.json +SERVICE=JF06_4M-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF06_4M.json diff --git a/scripts/JF07-stream.sh b/scripts/JF07-stream.sh index bcaf1d4..48a0e6b 100644 --- a/scripts/JF07-stream.sh +++ b/scripts/JF07-stream.sh @@ -1,5 +1,9 @@ #!/bin/bash coreAssociated="20,21,22,23" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF07.json +SERVICE=JF07-stream -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF07.json +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF09-buffer-worker.sh b/scripts/JF09-buffer-worker.sh new file mode 100644 index 0000000..49f6013 --- /dev/null +++ b/scripts/JF09-buffer-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF09-buffer-worker@00 + exit +fi + +M=$1 + +coreAssociatedBuffer=(12) + +initialUDPport=50150 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF09T01V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF09-buffer-worker@.service b/scripts/JF09-buffer-worker@.service new file mode 100644 index 0000000..b199c34 --- /dev/null +++ b/scripts/JF09-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF09 UDP2buffer worker instance as a service, instance %i +Requires=JF09-buffer.service +Before=JF09-buffer.service +BindsTo=JF09-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF09-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=1 + +[Install] +WantedBy=JF09-buffer.service diff --git a/scripts/JF09-buffer.service b/scripts/JF09-buffer.service new file mode 100644 index 0000000..1423a8f --- /dev/null +++ b/scripts/JF09-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF09 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF09-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF09-stream.service b/scripts/JF09-stream.service new file mode 100644 index 0000000..3dc6548 --- /dev/null +++ b/scripts/JF09-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF09 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF09-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF09-stream.sh b/scripts/JF09-stream.sh new file mode 100644 index 0000000..d05d2be --- /dev/null +++ b/scripts/JF09-stream.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +coreAssociated="25" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF09.json +SERVICE=JF09-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF09-vis.service b/scripts/JF09-vis.service new file mode 100644 index 0000000..ed7815e --- /dev/null +++ b/scripts/JF09-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF09 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF09-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF09-vis.sh b/scripts/JF09-vis.sh new file mode 100644 index 0000000..3b7eeb7 --- /dev/null +++ b/scripts/JF09-vis.sh @@ -0,0 +1,21 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5009 +PORT_BACKEND=9009 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +CORES=37 + +taskset -c ${CORES} \ +streamvis alvra --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-alvra:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title FLEX:Normal + diff --git a/scripts/JF10-buffer-worker.sh b/scripts/JF10-buffer-worker.sh new file mode 100644 index 0000000..b17dac8 --- /dev/null +++ b/scripts/JF10-buffer-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF10-buffer-worker@00 + exit +fi + +M=$1 + +coreAssociatedBuffer=(12) + +initialUDPport=50160 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF10T01V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF10-buffer-worker@.service b/scripts/JF10-buffer-worker@.service new file mode 100644 index 0000000..508c60e --- /dev/null +++ b/scripts/JF10-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF10 UDP2buffer worker instance as a service, instance %i +Requires=JF10-buffer.service +Before=JF10-buffer.service +BindsTo=JF10-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF10-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=1 + +[Install] +WantedBy=JF10-buffer.service diff --git a/scripts/JF10-buffer.service b/scripts/JF10-buffer.service new file mode 100644 index 0000000..5a820de --- /dev/null +++ b/scripts/JF10-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF10 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF10-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF10-stream.service b/scripts/JF10-stream.service new file mode 100644 index 0000000..786222f --- /dev/null +++ b/scripts/JF10-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF10 + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF10-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF10-stream.sh b/scripts/JF10-stream.sh new file mode 100644 index 0000000..b73b2df --- /dev/null +++ b/scripts/JF10-stream.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +coreAssociated="26" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF10.json +SERVICE=JF10-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF10-vis.service b/scripts/JF10-vis.service new file mode 100644 index 0000000..b08cb69 --- /dev/null +++ b/scripts/JF10-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF10 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF10-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF10-vis.sh b/scripts/JF10-vis.sh new file mode 100644 index 0000000..fe77449 --- /dev/null +++ b/scripts/JF10-vis.sh @@ -0,0 +1,21 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5010 +PORT_BACKEND=9010 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +CORES=38 + +taskset -c ${CORES} \ +streamvis alvra --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-alvra:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title FLEX:Stripsel + diff --git a/scripts/JF11-buffer-worker.sh b/scripts/JF11-buffer-worker.sh new file mode 100644 index 0000000..39dccca --- /dev/null +++ b/scripts/JF11-buffer-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ $# != 1 ] +then + systemctl start JF11-buffer-worker@{00..03} + exit +fi + +M=$1 + +coreAssociatedBuffer=(11 12 13 1) + +initialUDPport=50170 +port=$((${initialUDPport}+10#${M})) +DETECTOR=JF11T04V01 + +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF11-buffer-worker@.service b/scripts/JF11-buffer-worker@.service new file mode 100644 index 0000000..0c19154 --- /dev/null +++ b/scripts/JF11-buffer-worker@.service @@ -0,0 +1,16 @@ +[Unit] +Description=JF11 UDP2buffer worker instance as a service, instance %i +Requires=JF11-buffer.service +Before=JF11-buffer.service +BindsTo=JF11-buffer.service + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF11-buffer-worker.sh %i +TimeoutStartSec=10 +RestartSec=1 + +[Install] +WantedBy=JF11-buffer.service diff --git a/scripts/JF11-buffer.service b/scripts/JF11-buffer.service new file mode 100644 index 0000000..8730947 --- /dev/null +++ b/scripts/JF11-buffer.service @@ -0,0 +1,10 @@ +[Unit] +Description=All UDP-buffer instances of JF11 + +[Service] +Type=oneshot +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF11-buffer-worker.sh +RemainAfterExit=yes + +[Install] +WantedBy=multi-user.target diff --git a/scripts/JF11-stream.service b/scripts/JF11-stream.service new file mode 100644 index 0000000..4743c94 --- /dev/null +++ b/scripts/JF11-stream.service @@ -0,0 +1,15 @@ +[Unit] +Description=stream service (to streamvis and live analysis) of JF11(TXS Flex) + +[Service] +PermissionsStartOnly=true +Type=idle +User=root +ExecStart=/usr/bin/sh /home/dbe/service_scripts/JF11-stream.sh +TimeoutStartSec=10 +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF11-stream.sh b/scripts/JF11-stream.sh new file mode 100644 index 0000000..93db1d6 --- /dev/null +++ b/scripts/JF11-stream.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +coreAssociated="14,15,16" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF11.json +SERVICE=JF11-stream + +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/JF11-vis.service b/scripts/JF11-vis.service new file mode 100644 index 0000000..82a1698 --- /dev/null +++ b/scripts/JF11-vis.service @@ -0,0 +1,13 @@ +[Unit] +Description=streamvis: JF11 + +[Service] +User=root +TimeoutStartSec=2 +ExecStart=/bin/bash ./home/dbe/service_scripts/JF11-vis.sh +Restart=on-failure +RestartSec=4 + +[Install] +WantedBy=multi-user.target + diff --git a/scripts/JF11-vis.sh b/scripts/JF11-vis.sh new file mode 100644 index 0000000..22b2b14 --- /dev/null +++ b/scripts/JF11-vis.sh @@ -0,0 +1,18 @@ +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +conda deactivate +conda activate vis + +PORT=5011 +PORT_BACKEND=9011 + +H=`echo ${HOSTNAME} | sed 's/.psi.ch//'` +BACKEND=${H} + +taskset -c 17,18 \ +streamvis alvra --allow-websocket-origin=${H}:${PORT} \ +--allow-websocket-origin=sf-daq-alvra:${PORT} --port=${PORT} \ +--address tcp://${BACKEND}:${PORT_BACKEND} \ +--page-title TXS_Flex diff --git a/scripts/JF13-stream.sh b/scripts/JF13-stream.sh index 3b8e226..4489505 100644 --- a/scripts/JF13-stream.sh +++ b/scripts/JF13-stream.sh @@ -1,5 +1,9 @@ #!/bin/bash coreAssociated="25" +CONFIG=/gpfs/photonics/swissfel/buffer/config/stream-JF13.json +SERVICE=JF13-stream -taskset -c ${coreAssociated} /usr/bin/sf_stream /gpfs/photonics/swissfel/buffer/config/stream-JF13.json +/home/dbe/git/sf_daq_buffer/scripts/check_config_changed.sh ${CONFIG} ${SERVICE} & + +taskset -c ${coreAssociated} /usr/local/bin/sf_stream ${CONFIG} diff --git a/scripts/check_config_changed.sh b/scripts/check_config_changed.sh new file mode 100755 index 0000000..bf6b0fe --- /dev/null +++ b/scripts/check_config_changed.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +F=$1 +S=$2 + +t=`stat -c %y $F` + +while true +do + sleep 5 + t1=`stat -c %y $F` + + if [ "${t1}" != "${t}" ] + then + echo $F changed + t=${t1} + systemctl restart ${S} + fi + +done + diff --git a/scripts/clean_buffer.cron b/scripts/clean_buffer.cron new file mode 100644 index 0000000..b90262b --- /dev/null +++ b/scripts/clean_buffer.cron @@ -0,0 +1,3 @@ +heck every hour if buffer is occupied for larger then 80% and remove all files older then 3 hours +10 * * * * root /home/dbe/git/sf_daq_buffer/scripts/delete_old_files_in_buffer.sh 80 5 + diff --git a/scripts/copy_pedestal_file.py b/scripts/copy_pedestal_file.py new file mode 100644 index 0000000..4235d63 --- /dev/null +++ b/scripts/copy_pedestal_file.py @@ -0,0 +1,44 @@ +import argparse +import json +import os +import datetime +from shutil import copyfile + +PEDESTAL_DIRECTORY="/sf/jungfrau/data/pedestal" + +parser = argparse.ArgumentParser() + +parser.add_argument("file_pedestal", type=str) +parser.add_argument("json_run", type=str) +parser.add_argument("detector", type=str) +parser.add_argument("json_stream", type=str) + +args = parser.parse_args() + +with open(args.json_run, "r") as run_file: + data = json.load(run_file) + + request_time=datetime.datetime.strptime(data["request_time"], '%Y-%m-%d %H:%M:%S.%f') + + if not os.path.isdir(f'{PEDESTAL_DIRECTORY}/{args.detector}'): + os.mkdir(f'{PEDESTAL_DIRECTORY}/{args.detector}') + + out_name = f'{PEDESTAL_DIRECTORY}/{args.detector}/{request_time.strftime("%Y%m%d_%H%M%S")}.h5' + copyfile(args.file_pedestal, out_name) + + print(f'Copied resulting pedestal file {args.file_pedestal} to {out_name}') + + if not os.path.exists(args.json_stream): + print(f'stream file {args.json_stream} does not exists, exiting') + exit() + + with open(args.json_stream, "r") as stream_file: + det = json.load(stream_file) + + print(f'Changing in stream file {args.json_stream} pedestal from {det["pedestal_file"]} to {out_name}') + + det["pedestal_file"] = out_name + + with open(args.json_stream, "w") as write_file: + json.dump(det, write_file, indent=4) + diff --git a/scripts/delete_old_files_in_buffer.sh b/scripts/delete_old_files_in_buffer.sh index ddcfed1..e461708 100755 --- a/scripts/delete_old_files_in_buffer.sh +++ b/scripts/delete_old_files_in_buffer.sh @@ -3,11 +3,11 @@ hours=5 threshold=80 -if [ $# = 1 ] +if [ $# -ge 1 ] then threshold=$1 fi -if [ $# = 2 ] +if [ $# -eq 2 ] then hours=$2 fi diff --git a/scripts/export_file.py b/scripts/export_file.py index b0deccd..52d6224 100644 --- a/scripts/export_file.py +++ b/scripts/export_file.py @@ -7,6 +7,12 @@ import numpy as np import jungfrau_utils as ju +import sys +sys.path.append('/home/dbe/git/sf_daq_buffer/scripts') +import postprocess_raw + +import os + parser = argparse.ArgumentParser() parser.add_argument("file_in", type=str) @@ -27,8 +33,9 @@ with open(args.json_run, "r") as run_file: data = json.load(run_file) detector_params = data["detectors"][detector_name] - compression = detector_params.get("compression", True) - conversion = detector_params.get("adc_to_energy", True) + compression = detector_params.get("compression", False) + conversion = detector_params.get("adc_to_energy", False) + disabled_modules = detector_params.get("disabled_modules", []) if conversion: mask = detector_params.get("mask", True) mask_double_pixels = detector_params.get("mask_double_pixels", True) @@ -46,45 +53,55 @@ if not mask and mask_double_pixels: warnings.warn("mask_double_pixels set to False") mask_double_pixels = False -with ju.File( - args.file_in, - gain_file=gain_file, - pedestal_file=pedestal_file, - conversion=conversion, - mask=mask, - gap_pixels=gap_pixels, - geometry=geometry, - parallel=False, -) as juf: - n_input_frames = len(juf["data"]) - good_frames = np.nonzero(juf["is_good_frame"])[0] - n_output_frames = len(good_frames) - - juf.handler.mask_double_pixels = mask_double_pixels - juf.export( - args.file_out, - index=good_frames, - roi=None, - compression=compression, - factor=factor, - dtype=None, - batch_size=35, - ) - - pixel_mask = juf.handler.get_pixel_mask(gap_pixels=gap_pixels, geometry=geometry) - -# Postprocessing -with h5py.File(args.file_out, "r+") as h5f: - h5f[f"/data/{detector_name}/pixel_mask"] = np.invert(pixel_mask) +file_tmp = args.file_in +if len(disabled_modules)>0: + print(f"Will reduce data file, disabled_modules: {disabled_modules}") if conversion: - print("daq_rec:", h5f[f"/data/{detector_name}/daq_rec"][0, 0]) - del h5f[f"/data/{detector_name}/daq_rec"] + file_tmp = args.file_out+".tmp" + else: + file_tmp = args.file_out + postprocess_raw.postprocess_raw(args.file_in, file_tmp, compression=compression, disabled_modules=disabled_modules) - frame_index = h5f[f"/data/{detector_name}/frame_index"][:] - print("frame_index range:", (np.min(frame_index), np.max(frame_index))) - del h5f[f"/data/{detector_name}/frame_index"] +if conversion: - del h5f[f"/data/{detector_name}/is_good_frame"] + with ju.File( + file_tmp, + gain_file=gain_file, + pedestal_file=pedestal_file, + conversion=conversion, + mask=mask, + gap_pixels=gap_pixels, + geometry=geometry, + parallel=False, + ) as juf: + n_input_frames = len(juf["data"]) + good_frames = np.nonzero(juf["is_good_frame"])[0] + n_output_frames = len(good_frames) + + juf.handler.mask_double_pixels = mask_double_pixels + juf.export( + args.file_out, + index=good_frames, + roi=None, + compression=compression, + factor=factor, + dtype=None, + batch_size=35, + ) + os.remove(file_tmp) + +else: + with h5py.File(file_tmp, "r") as juf: + n_input_frames = len(juf[f"data/{detector_name}/data"]) + good_frames = np.nonzero(juf[f"data/{detector_name}/is_good_frame"])[0] + n_output_frames = len(good_frames) + +# Utility info +with h5py.File(args.file_out, "r") as h5f: + print("daq_rec:", h5f[f"/data/{detector_name}/daq_rec"][0, 0]) + + frame_index = h5f[f"/data/{detector_name}/frame_index"][:] + print("frame_index range:", (np.min(frame_index), np.max(frame_index))) print("input frames:", n_input_frames) print("bad frames:", n_input_frames - n_output_frames) @@ -99,4 +116,3 @@ print("geometry:", geometry) print("gap_pixels:", gap_pixels) print("compression:", compression) print("factor:", factor) - diff --git a/scripts/jungfrau_create_pedestals.py b/scripts/jungfrau_create_pedestals.py new file mode 100644 index 0000000..1dcb9e4 --- /dev/null +++ b/scripts/jungfrau_create_pedestals.py @@ -0,0 +1,225 @@ +import argparse +import sys +import os +import numpy as np +import h5py +import logging + +ch = logging.StreamHandler() +ch.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) + +log = logging.getLogger("create_pedestals") +log.addHandler(ch) + + +def h5_printname(name): + print(" {}".format(name)) + + +def forcedGainValue(i, n0, n1, n2, n3): + if i <= n0 - 1: + return 0 + if i <= (n0 + n1) - 1: + return 1 + if i <= (n0 + n1 + n2) - 1: + return 3 + if i <= (n0 + n1 + n2 + n3) - 1: + return 4 + return 2 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--verbosity", default=None, help="log verbosity level INFO/DEBUG/WARN/ERROR/CRITICAL") + parser.add_argument("--filename", default="pedestal.h5", help="pedestal file") + parser.add_argument("--X_test_pixel", type=int, default=0, help="x position of the test pixel") + parser.add_argument("--Y_test_pixel", type=int, default=0, help="y position of the test pixel") + parser.add_argument("--nFramesPede", type=int, default=1000, help="number of pedestal frames to average pedestal value") + parser.add_argument("--frames_G0", type=int, default=0, help="force to treat pedestal run as first frames_G0 taken in gain0, then frames_G1 in gain1, and frames_G2 in gain2 and HG0") + parser.add_argument("--frames_G1", type=int, default=0, help="force to treat pedestal run as first frames_G0 taken in gain0, then frames_G1 in gain1, and frames_G2 in gain2 and HG0") + parser.add_argument("--frames_G2", type=int, default=0, help="force to treat pedestal run as first frames_G0 taken in gain0, then frames_G1 in gain1, and frames_G2 in gain2 and HG0") + parser.add_argument("--frames_HG0", type=int, default=0, help="force to treat pedestal run as first frames_G0 taken in gain0, then frames_G1 in gain1, and frames_G2 in gain2 and HG0") + parser.add_argument("--number_frames", type=int, default=1000000, help="analyze only first number_frames frames") + parser.add_argument("--frames_average", type=int, default=1000, help="for pedestal in each gain average over last frames_average frames, reducing weight of previous") + parser.add_argument("--directory", default="./", help="Output directory where to store pixelmask and gain file") + parser.add_argument("--gain_check", type=int, default=1, help="check that gain setting in each of the module corresponds to the general gain switch, (0 - dont check)") + parser.add_argument("--add_pixel_mask", default=None, help="add additional masked pixels from external, specified file") + parser.add_argument("--number_bad_modules", type=int, default=0, help="Number of bad modules in detector") + args = parser.parse_args() + + if not (os.path.isfile(args.filename) and os.access(args.filename, os.R_OK)): + print("Pedestal file {} not found, exit".format(args.filename)) + exit() + + if args.verbosity: + log.setLevel(getattr(logging, args.verbosity.upper(), None)) + + overwriteGain = False + if (args.frames_G0 + args.frames_G1 + args.frames_G2) > 0: + log.info("Treat this run as taken with {} frames in gain0, then {} frames in gain1 and {} frames in gain2".format(args.frames_G0, args.frames_G1, args.frames_G2)) + overwriteGain = True + + f = h5py.File(args.filename, "r") + + #detector_name = (f.get("general/detector_name").value).decode('UTF-8') + detector_name = (f.get("general/detector_name")[()]).decode('UTF-8') + #n_bad_modules = f.get("general/n_bad_modules").value + n_bad_modules = args.number_bad_modules + + data_location = "data/" + detector_name + "/data" + daq_recs_location = "data/" + detector_name + "/daq_rec" + is_good_frame_location = "data/" + detector_name + "/is_good_frame" + + + numberOfFrames = len(f[data_location]) + (sh_y, sh_x) = f[data_location][0].shape + nModules = (sh_x * sh_y) // (1024 * 512) + if (nModules * 1024 * 512) != (sh_x * sh_y): + log.error(" {} : Something very strange in the data, Jungfrau consists of (1024x512) modules, while data has {}x{}".format(detector_name, sh_x, sh_y)) + exit() + + (tX, tY) = (args.X_test_pixel, args.Y_test_pixel) + if tX < 0 or tX > (sh_x - 1): + tX = 0 + if tY < 0 or tY > (sh_y - 1): + tY = 0 + + log.debug(" {} : test pixel is ( x y ): {}x{}".format(detector_name, tX, tY)) + log.info(" {} : In pedestal file {} there are {} frames".format(detector_name, args.filename, numberOfFrames + 1)) +# log.debug("Following groups are available:") +# if args.verbosity >= 3: +# f.visit(h5_printname) + log.debug(" {} : data has the following shape: {}, type: {}, {} modules ({} bad modules)".format(detector_name, f[data_location][0].shape, f[data_location][0].dtype, nModules, n_bad_modules)) + + pixelMask = np.zeros((sh_y, sh_x), dtype=np.int) + + adcValuesN = np.zeros((5, sh_y, sh_x)) + adcValuesNN = np.zeros((5, sh_y, sh_x)) + + + averagePedestalFrames = args.frames_average + + nMgain = [0] * 5 + + gainCheck = -1 + highG0Check = 0 + printFalseGain = False + nGoodFrames = 0 + nGoodFramesGain = 0 + + analyzeFrames = min(numberOfFrames, args.number_frames) + + for n in range(analyzeFrames): + + if not f[is_good_frame_location][n]: + continue + + nGoodFrames += 1 + + daq_rec = (f[daq_recs_location][n])[0] + + image = f[data_location][n][:] + frameData = (np.bitwise_and(image, 0b0011111111111111)) + gainData = np.bitwise_and(image, 0b1100000000000000) >> 14 + trueGain = forcedGainValue(n, args.framesG0, args.framesG1, args.framesG2, args.framesHG0) if overwriteGain else ( (daq_rec & 0b11000000000000) >> 12 ) + highG0 = (daq_rec & 0b1) + + gainGoodAllModules = True + if args.gain_check > 0: + daq_recs = f[daq_recs_location][n] + for i in range(len(daq_recs)): + if trueGain != ((daq_recs[i] & 0b11000000000000) >> 12) or highG0 != (daq_recs[i] & 0b1): + gainGoodAllModules = False + + if highG0 == 1 and trueGain != 0: + gainGoodAllModules = False + log.info(" {} : Jungfrau is in the high G0 mode ({}), but gain settings is strange: {}".format( detector_name, highG0, trueGain)) + + nFramesGain = np.sum(gainData==(trueGain)) + if nFramesGain < (nModules - 0.5 - n_bad_modules) * (1024 * 512): # make sure that most are the modules are in correct gain + gainGoodAllModules = False + log.debug(" {} : Too many bad pixels, skip the frame {}, true gain: {}(highG0: {}) ({}); gain0 : {}; gain1 : {}; gain2 : {}; undefined gain : {}".format( detector_name, n, trueGain, highG0, nFramesGain, np.sum(gainData==0), np.sum(gainData==1), np.sum(gainData==3), np.sum(gainData==2))) + + if not gainGoodAllModules: + log.debug(" {} : In Frame Number {} : mismatch in modules and general settings, Gain: {} vs {}; HighG0: {} vs {} (or too many bad pixels)".format( detector_name, n, trueGain, ((daq_recs & 0b11000000000000) >> 12), highG0, (daq_recs & 0b1))) + continue + nGoodFramesGain += 1 + + if gainData[tY][tX] != trueGain: + if not printFalseGain: + log.info(" {} : Gain wrong for channel ({}x{}) should be {}, but {}. Frame {}. {} {}".format( detector_name, tX, tY, trueGain, gainData[tY][tX], n, trueGain, daq_rec)) + printFalseGain = True + else: + if gainCheck != -1 and printFalseGain: + log.info(" {} : Gain was wrong for channel ({}x{}) in previous frames, but now correct : {}. Frame {}.".format( detector_name, tX, tY, gainData[tY, tX], n)) + printFalseGain = False + + if gainData[tY][tX] != gainCheck or highG0Check != highG0: + log.info(" {} : Gain changed for ({}x{}) channel {} -> {} (highG0 setting: {} -> {}), frame number {}, match: {}".format( detector_name, tX, tY, gainCheck, gainData[tY][tX], highG0Check, highG0, n, gainData[tY][tX] == trueGain)) + gainCheck = gainData[tY][tX] + highG0Check = highG0 + + if gainGoodAllModules: + + pixelMask[gainData != trueGain] |= (1 << (trueGain+4*highG0)) + + trueGain += 4 * highG0 + + + nMgain[trueGain] += 1 + + if nMgain[trueGain] > averagePedestalFrames: + adcValuesN[trueGain] -= adcValuesN[trueGain] / averagePedestalFrames + adcValuesNN[trueGain] -= adcValuesNN[trueGain] / averagePedestalFrames + + adcValuesN[trueGain] += frameData + adcValuesNN[trueGain] += np.float_power(frameData, 2) + + + log.info(" {} : {} frames analyzed, {} good frames, {} frames without settings mismatch. Gain frames distribution (0,1,2,3,HG0) : ({})".format( detector_name, analyzeFrames, nGoodFrames, nGoodFramesGain, nMgain)) + + if args.add_pixel_mask != None: + if (os.path.isfile(args.add_pixel_mask) and os.access(args.add_pixel_mask, os.R_OK)): + additional_pixel_mask_file = h5py.File(args.add_pixel_mask, "r") + additional_pixel_mask = np.array(additional_pixel_mask_file["pixel_mask"]) + log.info("Will add additional masked pixels from file %s , number %d " % (args.add_pixel_mask, np.sum(additional_pixel_mask == 1))) + if additional_pixel_mask.shape == pixelMask.shape: + pixelMask[additional_pixel_mask == 1] |= (1 << 5) + else: + log.error(" shape of additional pixel mask ({}) doesn't match current ({})".format( additional_pixel_mask.shape, pixelMask.shape)) + else: + log.error(" Specified addition file with pixel mask not found or not reachable {}".format( args.add_pixel_mask)) + + fileNameIn = os.path.splitext(os.path.basename(args.filename))[0] + full_fileNameOut = args.directory + "/" + fileNameIn + ".res.h5" + log.info(" {} : Output file with pedestal corrections in: {}".format( detector_name, full_fileNameOut)) + outFile = h5py.File(full_fileNameOut, "w") + + gains = [None] * 4 + gainsRMS = [None] * 4 + + for gain in range(5): + numberFramesAverage = max(1, min(averagePedestalFrames, nMgain[gain])) + mean = adcValuesN[gain] / float(numberFramesAverage) + mean2 = adcValuesNN[gain] / float(numberFramesAverage) + variance = mean2 - np.float_power(mean, 2) + stdDeviation = np.sqrt(variance) + log.debug(" {} : gain {} values results (pixel ({},{}) : {} {}".format( detector_name, gain, tY, tX, mean[tY][tX], stdDeviation[tY][tX])) + if gain != 2: + g = gain if gain < 3 else (gain-1) + gains[g] = mean + gainsRMS[g] = stdDeviation + + pixelMask[np.isclose(stdDeviation,0)] |= (1 << (6 + g)) + + dset = outFile.create_dataset('pixel_mask', data=pixelMask) + dset = outFile.create_dataset('gains', data=gains) + dset = outFile.create_dataset('gainsRMS', data=gainsRMS) + + outFile.close() + + log.info(" {} : Number of good pixels: {} from {} in total ({} bad pixels)".format( detector_name, np.sum(pixelMask == 0), sh_x * sh_y, (sh_x * sh_y - np.sum(pixelMask == 0)))) + + +if __name__ == "__main__": + main() diff --git a/scripts/make_crystfel_list.py b/scripts/make_crystfel_list.py index 77ae477..58df55d 100644 --- a/scripts/make_crystfel_list.py +++ b/scripts/make_crystfel_list.py @@ -26,6 +26,9 @@ def is_it_dark(laser_mode, detector_rate, pulseid): dark = True elif laser_mode == 1: dark = False + elif laser_mode == 13: + if (pulseid % int(100/detector_rate*4)) == 0: + dark = False else: if (pulseid + int(100/detector_rate) ) % dark_rate == 0: dark = True @@ -34,14 +37,27 @@ def is_it_dark(laser_mode, detector_rate, pulseid): return dark +def which_dark(laser_mode, detector_rate, pulseid): + + dark_mode = -1 + if laser_mode != 13: + dark_mode = 0 + else: + for m in range(1,4): + if ((pulseid-m*int(100/detector_rate)) % int(100/detector_rate*4)) == 0: + dark_mode = m + + return dark_mode parser = argparse.ArgumentParser() parser.add_argument("data_file", type=str) parser.add_argument("run_info", type=str) +parser.add_argument("detector", type=str) args = parser.parse_args() data_file = args.data_file run_info_file = args.run_info +detector = args.detector try: with open(run_info_file) as json_file: @@ -62,8 +78,6 @@ except: print(f"Can't open {data_file}") exit() -detector = 'JF06T32V02' - pulseids = f[f'/data/{detector}/pulse_id'][:] n_pulse_id = len(pulseids) if f'/data/{detector}/is_good_frame' in f.keys(): @@ -76,6 +90,9 @@ nProcessedFrames = 0 index_dark = [] index_light = [] + +index_dark_mode = {} + for i in range(len(pulseids)): if not is_good_frame[i]: continue @@ -84,6 +101,11 @@ for i in range(len(pulseids)): nProcessedFrames += 1 if is_it_dark(laser_mode, detector_rate, p): index_dark.append(i) + if laser_mode == 13: + dark_mode = which_dark(laser_mode, detector_rate, p) + if dark_mode not in index_dark_mode: + index_dark_mode[dark_mode] = [] + index_dark_mode[dark_mode].append(i) else: index_light.append(i) @@ -112,3 +134,12 @@ if len(index_light) > 0: f_list.close() +for m in index_dark_mode: + if len(index_dark_mode[m]) > 0: + file_dark = f'{data_file[:-3]}.dark{m}.lst' + print(f"List of dark{m} frames : {file_dark} , {len(index_dark_mode[m])} frames") + f_list = open(file_dark, "w") + for frame_number in index_dark_mode[m]: + print(f'{data_file} //{frame_number}', file = f_list) + f_list.close() + diff --git a/scripts/postprocess_raw.py b/scripts/postprocess_raw.py new file mode 100644 index 0000000..3d3a285 --- /dev/null +++ b/scripts/postprocess_raw.py @@ -0,0 +1,140 @@ +import os +import struct + +import bitshuffle +import h5py +import numpy as np +from bitshuffle.h5 import H5_COMPRESS_LZ4, H5FILTER # pylint: disable=no-name-in-module + +# bitshuffle hdf5 filter params +BLOCK_SIZE = 2048 +compargs = {"compression": H5FILTER, "compression_opts": (BLOCK_SIZE, H5_COMPRESS_LZ4)} +# limit bitshuffle omp to a single thread +# a better fix would be to use bitshuffle compiled without omp support +os.environ["OMP_NUM_THREADS"] = "1" + +DTYPE = np.dtype(np.uint16) +DTYPE_SIZE = DTYPE.itemsize + +MODULE_SIZE_X = 1024 +MODULE_SIZE_Y = 512 + + +def postprocess_raw( + source, dest, disabled_modules=(), index=None, compression=False, batch_size=100 +): + # a function for 'visititems' should have the args (name, object) + def _visititems(name, obj): + if isinstance(obj, h5py.Group): + h5_dest.create_group(name) + + elif isinstance(obj, h5py.Dataset): + dset_source = h5_source[name] + + # process all but the raw data + if name != data_dset: + if name.startswith("data"): + # datasets with data per image, so indexing should be applied + if index is None: + data = dset_source[:] + else: + data = dset_source[index, :] + + args = {"shape": data.shape} + h5_dest.create_dataset_like(name, dset_source, data=data, **args) + else: + h5_dest.create_dataset_like(name, dset_source, data=dset_source) + + else: + raise TypeError(f"Unknown h5py object type {obj}") + + # copy group/dataset attributes if it's not a dataset with the actual data + if name != data_dset: + for key, value in h5_source[name].attrs.items(): + h5_dest[name].attrs[key] = value + + with h5py.File(source, "r") as h5_source, h5py.File(dest, "w") as h5_dest: + detector_name = h5_source["general/detector_name"][()].decode() + data_dset = f"data/{detector_name}/data" + + # traverse the source file and copy/index all datasets, except the raw data + h5_source.visititems(_visititems) + + # now process the raw data + dset = h5_source[data_dset] + + args = dict() + if index is None: + n_images = dset.shape[0] + else: + index = np.array(index) + n_images = len(index) + + n_modules = dset.shape[1] // MODULE_SIZE_Y + out_shape = (MODULE_SIZE_Y * (n_modules - len(disabled_modules)), MODULE_SIZE_X) + + args["shape"] = (n_images, *out_shape) + args["maxshape"] = (n_images, *out_shape) + args["chunks"] = (1, *out_shape) + + if compression: + args.update(compargs) + + h5_dest.create_dataset_like(data_dset, dset, **args) + + # calculate and save module_map + module_map = [] + tmp = 0 + for ind in range(n_modules): + if ind in disabled_modules: + module_map.append(-1) + else: + module_map.append(tmp) + tmp += 1 + + h5_dest[f"data/{detector_name}/module_map"] = np.tile(module_map, (n_images, 1)) + + # prepare buffers to be reused for every batch + read_buffer = np.empty((batch_size, *dset.shape[1:]), dtype=DTYPE) + out_buffer = np.zeros((batch_size, *out_shape), dtype=DTYPE) + + # process and write data in batches + for batch_start_ind in range(0, n_images, batch_size): + batch_range = range(batch_start_ind, min(batch_start_ind + batch_size, n_images)) + + if index is None: + batch_ind = np.array(batch_range) + else: + batch_ind = index[batch_range] + + # TODO: avoid unnecessary buffers + read_buffer_view = read_buffer[: len(batch_ind)] + out_buffer_view = out_buffer[: len(batch_ind)] + + # Avoid a stride-bottleneck, see https://github.com/h5py/h5py/issues/977 + if np.sum(np.diff(batch_ind)) == len(batch_ind) - 1: + # consecutive index values + dset.read_direct(read_buffer_view, source_sel=np.s_[batch_ind]) + else: + for i, j in enumerate(batch_ind): + dset.read_direct(read_buffer_view, source_sel=np.s_[j], dest_sel=np.s_[i]) + + for i, m in enumerate(module_map): + if m == -1: + continue + + read_slice = read_buffer_view[:, i * MODULE_SIZE_Y : (i + 1) * MODULE_SIZE_Y, :] + out_slice = out_buffer_view[:, m * MODULE_SIZE_Y : (m + 1) * MODULE_SIZE_Y, :] + out_slice[:] = read_slice + + bytes_num_elem = struct.pack(">q", out_shape[0] * out_shape[1] * DTYPE_SIZE) + bytes_block_size = struct.pack(">i", BLOCK_SIZE * DTYPE_SIZE) + header = bytes_num_elem + bytes_block_size + + for pos, im in zip(batch_range, out_buffer_view): + if compression: + byte_array = header + bitshuffle.compress_lz4(im, BLOCK_SIZE).tobytes() + else: + byte_array = im.tobytes() + + h5_dest[data_dset].id.write_direct_chunk((pos, 0, 0), byte_array) diff --git a/scripts/retrieve_detector_data.sh b/scripts/retrieve_detector_data.sh index ec13813..2fd4614 100755 --- a/scripts/retrieve_detector_data.sh +++ b/scripts/retrieve_detector_data.sh @@ -55,22 +55,42 @@ case ${DETECTOR} in NM=9 DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF02.json ;; +'JF04T01V01') + NM=1 + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF04.json + ;; +'JF03T01V02') + NM=1 + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF03.json + ;; 'JF06T32V02') NM=32 DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF06.json ;; 'JF06T08V02') NM=8 - DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF06_4M.json + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF06_4M.daq8.json ;; 'JF07T32V01') NM=32 DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF07.json ;; +'JF09T01V01') + NM=1 + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF09.json + ;; +'JF10T01V01') + NM=1 + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF10.json + ;; 'JF13T01V01') NM=1 DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF13.json ;; +'JF11T04V01') + NM=4 + DET_CONFIG_FILE=/gpfs/photonics/swissfel/buffer/config/stream-JF11.json + ;; *) NM=1 esac @@ -86,9 +106,6 @@ PREVIOUS_STILL_RUN=0 while [ ${PREVIOUS_STILL_RUN} == 0 ] do sleep 15 # we need to sleep at least to make sure that we don't read from CURRENT file -# ps -fe | grep "bin/sf_writer " | grep -v grep | grep sf_writer > /dev/null -# PREVIOUS_STILL_RUN=$? # not found == 1 -# PREVIOUS_STILL_RUN=1 n=`ps -fe | grep "bin/sf_writer " | grep -v grep | grep sf_writer | wc -l` if [ ${n} -lt 9 ] then @@ -119,15 +136,19 @@ else fi fi -taskset -c ${coreAssociated} /usr/bin/sf_writer ${OUTFILE_RAW} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${NM} ${START_PULSE_ID} ${STOP_PULSE_ID} ${PULSE_ID_STEP}>> /tmp/detector_retrieve.log & +taskset -c ${coreAssociated} /usr/local/bin/sf_writer ${OUTFILE_RAW} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${NM} ${START_PULSE_ID} ${STOP_PULSE_ID} ${PULSE_ID_STEP}>> /tmp/detector_retrieve.log & wait #coreAssociatedConversion="35,34,33,32,31,30,29,28,27" coreAssociatedConversion="35,34,33,32,31,30,29,28,27,26,25,24,23,22,21,20,19,18" +#coreAssociatedConversion="26,25,24,23,22,21,20,19,18" #TODO: calculate this number from coreAssociatedConversion #export NUMBA_NUM_THREADS=18 +#not clear why, but bitshuffle doesn't respect OMP_NUM_THREADS set in jungfrau_utils anymore, thus we set it here +export OMP_NUM_THREADS=1 + date3=$(date +%s) echo "Finished : "`date` echo -n "Retrieve Time : " @@ -136,6 +157,56 @@ echo $((date3-date2)) | awk '{print int($1/60)":"int($1%60)}' if [ ${JF_CONVERSION} == 0 ] then echo "File is written in raw format, no compression" + + dir_name=`dirname ${OUTFILE_RAW}` + base_name=`basename ${dir_name}` + + if [ ${base_name} == "JF_pedestals" ] + then + echo "Pedestal run will make conversion" + + export PATH=/home/dbe/miniconda3/bin:$PATH + + source /home/dbe/miniconda3/etc/profile.d/conda.sh + + conda deactivate + conda activate sf-daq + + if [ ${DETECTOR} == "JF07T32V01" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --add_pixel_mask /sf/bernina/config/jungfrau/pixel_mask/JF07T32V01/pixel_mask_13_full.h5 + elif [ ${DETECTOR} == "JF03T01V02" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --add_pixel_mask /sf/bernina/config/jungfrau/pixel_mask/JF03T01V02/pixel_mask_half_chip.h5 +# time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG + elif [ ${DETECTOR} == "JF02T09V02" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --number_bad_modules=1 + elif [ ${DETECTOR} == "JF06T08V02" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --add_pixel_mask /sf/alvra/config/jungfrau/pixel_mask/JF06T08V01/mask_2lines_module3.h5 +# elif [ ${DETECTOR} == "JF06T32V02" ] +# then +# time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --add_pixel_mask /sf/alvra/config/jungfrau/pixel_mask/JF06T32V02/mask_noise_in_28.h5 + elif [ ${DETECTOR} == "JF13T01V01" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --add_pixel_mask /sf/bernina/config/jungfrau/pixel_mask/JF13T01V01/pixel_mask_bad_rb_22.09.2020.h5 + elif [ ${DETECTOR} == "JF11T04V01" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --number_bad_modules=2 + elif [ ${DETECTOR} == "JF10T01V01" ] + then + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG --number_bad_modules=1 + else + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/jungfrau_create_pedestals.py --filename ${OUTFILE_RAW} --directory ${dir_name} --verbosity DEBUG + fi + + PEDESTAL_FILE=`echo ${OUTFILE_RAW} | sed 's/.h5/.res.h5/'` + + taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/copy_pedestal_file.py ${PEDESTAL_FILE} ${RUN_FILE} ${DETECTOR} ${DET_CONFIG_FILE} + + fi + else echo "Will call compression/convertion ${OUTFILE_RAW} --> ${OUTFILE}" @@ -144,7 +215,7 @@ else do sleep 15 # we need to sleep at least to make sure that we don't read from CURRENT file n=`ps -fe | grep "scripts/export_file.py " | grep -v grep | grep export | wc -l` - if [ ${n} -lt 18 ] + if [ ${n} -lt 15 ] then PREVIOUS_STILL_RUN=1 fi @@ -154,10 +225,17 @@ else echo $((date4-date3)) | awk '{print int($1/60)":"int($1%60)}' export PATH=/home/dbe/miniconda3/bin:$PATH - source deactivate >/dev/null 2>&1 - source activate conversion - taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/export_file.py ${OUTFILE_RAW} ${OUTFILE} ${RUN_FILE} ${DET_CONFIG_FILE} - python /home/dbe/git/sf_daq_buffer/scripts/make_crystfel_list.py ${OUTFILE} ${RUN_FILE} + + source /home/dbe/miniconda3/etc/profile.d/conda.sh + + conda deactivate + conda activate sf-daq + + time taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/export_file.py ${OUTFILE_RAW} ${OUTFILE} ${RUN_FILE} ${DET_CONFIG_FILE} + if [ ${DETECTOR} == "JF06T32V02" ] || [ ${DETECTOR} == "JF06T08V02" ] + then + taskset -c ${coreAssociatedConversion} python /home/dbe/git/sf_daq_buffer/scripts/make_crystfel_list.py ${OUTFILE} ${RUN_FILE} ${DETECTOR} + fi date5=$(date +%s) echo "Finished : "`date` echo -n "Conversion Time : " diff --git a/scripts/sf_daq_buffer.setup.sh b/scripts/sf_daq_buffer.setup.sh new file mode 100755 index 0000000..828de0b --- /dev/null +++ b/scripts/sf_daq_buffer.setup.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# needed, otherwise executing with Ansible won't work +# see: https://github.com/conda/conda/issues/7267 +unset SUDO_UID SUDO_GID SUDO_USER + +if [ ! -d /home/dbe/git ]; then + echo "No git repo found, cloning it..." + mkdir /home/dbe/git +fi + +REPO=sf_daq_buffer +if [ ! -d /home/dbe/git/${REPO} ]; then + cd /home/dbe/git && git clone https://github.com/paulscherrerinstitute/${REPO}.git + + source /opt/rh/devtoolset-9/enable + cd /home/dbe/git/${REPO} && mkdir -p build && cd build/ && cmake3 .. && make +fi + + diff --git a/scripts/start_detector.sh b/scripts/start_detector.sh deleted file mode 100755 index d87b4e9..0000000 --- a/scripts/start_detector.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/bash - -if [ $# -lt 1 ] -then - echo "Usage : $0 DETECTOR_NAME " - echo " DETECTOR_NAME: JF07 or JF01..." - echo " number_of_cycles : optional, default 100" - exit -fi - -DETECTOR=$1 -case ${DETECTOR} in -'JF01') - D=1 - ;; -'JF02') - D=2 - ;; -'JF06') - D=6 - ;; -'JF07') - D=7 - ;; -'JF13') - D=13 - ;; -*) - echo "Unsupported detector" - exit - ;; -esac - -n_cycles=100 -if [ $# == 2 ] -then - n_cycles=$2 -fi - -export PATH=/home/dbe/miniconda3/bin:$PATH -source deactivate -source activate dia - -sls_detector_put ${D}-timing trigger -sls_detector_put ${D}-cycles ${n_cycles} -sls_detector_put ${D}-exptime 5e-06 -sls_detector_put ${D}-frames 1 -sls_detector_put ${D}-dr 16 -#sls_detector_put ${D}-clearbit to 0x5d 0 # normal mode, not highG0 -sls_detector_put ${D}-status start - -echo "Now start trigger" diff --git a/scripts/streamvis_setup.sh b/scripts/streamvis_setup.sh new file mode 100755 index 0000000..346ad3d --- /dev/null +++ b/scripts/streamvis_setup.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# needed, otherwise executing with Ansible won't work +# see: https://github.com/conda/conda/issues/7267 +unset SUDO_UID SUDO_GID SUDO_USER + +if [ ! -f /home/dbe/miniconda3/bin/conda ] +then + echo "Getting Miniconda" + wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh + sh Miniconda3-latest-Linux-x86_64.sh -b -p /home/dbe/miniconda3 + + rm -rf Miniconda3-latest-Linux-x86_64.sh +fi + +# Setup the conda environment. +export PATH=/home/dbe/miniconda3/bin:$PATH + +source /home/dbe/miniconda3/etc/profile.d/conda.sh + +CONDA_ENV_NAME=vis +envtest=$(conda env list | grep ${CONDA_ENV_NAME}) + +if [ $? != 0 ]; then + echo "Creating the ${CONDA_ENV_NAME} environment" + conda create -n vis -y -c paulscherrerinstitute -c conda-forge streamvis +fi + diff --git a/scripts/test_conversion.sh b/scripts/test_conversion.sh new file mode 100755 index 0000000..18567e8 --- /dev/null +++ b/scripts/test_conversion.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +export PATH=/home/dbe/miniconda3/bin:$PATH +source /home/dbe/miniconda3/etc/profile.d/conda.sh +conda deactivate +conda activate sf-daq + +export OMP_NUM_THREADS=1 + +#export NUMBA_NUM_THREADS=$1 +#OUTDIR=/sf/alvra/data/p18674/raw/run_info/003000/CONVERSION-PAR-${NUMBA_NUM_THREADS} +OUTDIR=/sf/alvra/data/p18674/raw/run_info/003000/CONVERSION-NEW.21-daq1 + +#coreAssociatedBuffer=(35 34 33 32 31 30 29 28 27 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 8 7 6 5 4 3 2 1 0) +#coreAssociatedBuffer=(35 34 33 32 31 30 29 28 27 18 19 20 21 22 23 24 25 26 9 10 11 12 13 14 15 16 17 8 7 6 5 4 3 2 1 0) + +coreAssociatedBuffer=(35 26 34 25 33 24 32 23 31 22 30 21 29 20 28 19 27 18 17 8 16 7 15 6 14 5 13 4 12 3 11 2 10 1 9) + +coreAssociated="35,26,34,25,33,24,32,23,31,22,30,21,29,20,28,19,27,18" +#coreAssociated="35,34,33,32,31,30,29,28,27" +#coreAssociated="26,25,24,23,22,21,20,19,18" + +#for N in 1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 33 35 +for N in 10 12 14 16 18 20 1 2 4 6 8 22 24 26 28 30 32 35 +do + + for n in `seq -f %02g 1 $N` + do + rm -rf /sf/alvra/data/p18674/raw/run_info/003000/conversion_0030${n}.log + sleep 0.1 + +# c=`echo $n - 1 | bc` +# echo process : $n cores : ${coreAssociatedBuffer[10#${c}]} +# taskset -c ${coreAssociatedBuffer[10#${c}]} python /home/dbe/git/sf_daq_buffer/scripts/export_file.py /sf/alvra/data/p18674/raw//RAW_DATA/test_16M/run_0030${n}.JF06T32V02.h5 /sf/alvra/data/p18674/raw/test_16M/run_0030${n}.JF06T32V02.h5 /sf/alvra/data/p18674/raw/run_info/003000/run_0030${n}.json /gpfs/photonics/swissfel/buffer/config/stream-JF06.json > /sf/alvra/data/p18674/raw/run_info/003000/conversion_0030${n}.log & + + echo process : $n cores :${coreAssociated} + rm -rf /sf/alvra/data/p18674/raw/run_info/003000/conversion_0030${n}.log + taskset -c ${coreAssociated} python /home/dbe/git/sf_daq_buffer/scripts/export_file.py /sf/alvra/data/p18674/raw//RAW_DATA/test_16M/run_0030${n}.JF06T32V02.h5 /sf/alvra/data/p18674/raw/test_16M/run_0030${n}.JF06T32V02.h5 /sf/alvra/data/p18674/raw/run_info/003000/run_0030${n}.json /gpfs/photonics/swissfel/buffer/config/stream-JF06.json > /sf/alvra/data/p18674/raw/run_info/003000/conversion_0030${n}.log & + + done + echo Submitted + + A=0 + while [ $A -lt 30 ] + do + sleep 30 + A=`grep read /sf/alvra/data/p18674/raw/run_info/003000/conversion_003001.log | wc -l` + echo Number of cycles passed $A + done + + K=`ps -fe | grep export | grep -v grep | awk '{print $2}' | xargs` + echo Killing `ps -fe | grep export | grep -v grep | awk '{print $2}' | wc -l` processes ${K} + kill -9 ${K} + + sleep 2 + + mkdir -p ${OUTDIR}/${N} + mv /sf/alvra/data/p18674/raw/run_info/003000/conversion_0030* ${OUTDIR}/${N}/. + +done diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 195bbde..e34e637 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -10,6 +10,8 @@ #include #include +#include +// using namespace std; using namespace stream_config; @@ -45,7 +47,6 @@ ZmqLiveSender::ZmqLiveSender( throw runtime_error(zmq_strerror(errno)); } } - } ZmqLiveSender::~ZmqLiveSender()