Compare commits

..

14 Commits

Author SHA1 Message Date
76ff39c012 Merge branch '2405-maxiv-day-2' into 'main'
MAX IV experiment day 2 corrections

See merge request jungfraujoch/nextgendcu!65
2024-05-23 12:15:36 +02:00
3ef89483e8 MAX IV experiment day 2 corrections 2024-05-23 12:15:36 +02:00
e185fbb3f5 Merge branch '2405-maxiv-day-1' into 'main'
MAX IV test day 1 modifications

See merge request jungfraujoch/nextgendcu!64
2024-05-21 10:22:37 +02:00
701c083739 MAX IV test day 1 modifications 2024-05-21 10:22:37 +02:00
03662506c6 Merge branch '2405-writer-no-common' into 'main'
Simplify dependencies for jfjoch_writer

See merge request jungfraujoch/nextgendcu!63
2024-05-20 18:05:36 +02:00
1b297babe9 Simplify dependencies for jfjoch_writer 2024-05-20 18:05:36 +02:00
adc7aa7c7d Merge branch '2405-dependencies-3' into 'main'
Build RPM for DKMS driver

See merge request jungfraujoch/nextgendcu!62
2024-05-20 11:40:30 +02:00
27e17c316d Build RPM for DKMS driver 2024-05-20 11:40:30 +02:00
d7d66dc85c Merge branch '2405-dependencies-2' into 'main'
Minor fixes for dependencies

See merge request jungfraujoch/nextgendcu!61
2024-05-19 14:09:00 +02:00
2a8fc3a466 Minor fixes for dependencies 2024-05-19 14:09:00 +02:00
e4acd93b88 Merge branch '2405-dependencies' into 'main'
Change the way dependencies are handled

See merge request jungfraujoch/nextgendcu!60
2024-05-17 19:19:17 +02:00
4ca397bd42 Change the way dependencies are handled 2024-05-17 19:19:17 +02:00
4d780c1dda Merge branch '2405-zmq-preview' into 'main'
1.0.0-rc2: Fixes in preview

See merge request jungfraujoch/nextgendcu!59
2024-05-16 17:41:47 +02:00
949f693311 1.0.0-rc2: Fixes in preview 2024-05-16 17:41:47 +02:00
70 changed files with 3174 additions and 445 deletions

View File

@@ -34,7 +34,7 @@ build:x86:gcc_writer:
- cd build
- source /opt/rh/gcc-toolset-12/enable
- cmake -DCMAKE_BUILD_TYPE=Release -DJFJOCH_WRITER_ONLY=ON ..
- make -j48 jfjoch
- make -j48
build:x86:driver:
stage: build
@@ -83,16 +83,35 @@ build:x86:frontend:
- x86
needs: []
script:
- cd frontend_ui
- npm install
- npm run build
- mkdir build
- cd build
- /usr/bin/cmake ..
- make frontend
- cd ../frontend_ui/build
- tar czf ../../jfjoch_frontend.tar.gz *
artifacts:
paths:
- jfjoch_frontend.tar.gz
expire_in: 1 week
build:x86:rpm:
stage: build
tags:
- x86
needs: []
script:
- mkdir build
- cd build
- source /opt/rh/gcc-toolset-12/enable
- cmake -DCMAKE_BUILD_TYPE=Release ..
- make frontend
- make -j48 package
- mv *.rpm ..
artifacts:
paths:
- "*.rpm"
expire_in: 1 week
test:x86:gcc:
stage: test
timeout: 90m
@@ -292,9 +311,16 @@ release:
- synthesis:vivado_pcie_100g
- build:x86:frontend
- build:x86:driver
- build:x86:rpm
script:
- export PACKAGE_VERSION=`head -n1 VERSION`
- export PACKAGE_REGISTRY_URL="${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/jungfraujoch/${PACKAGE_VERSION}"
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-driver-dkms-${PACKAGE_VERSION}-1.el8.noarch.rpm "${PACKAGE_REGISTRY_URL}/jfjoch-driver-dkms-${PACKAGE_VERSION}-1.el8.noarch.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-driver-dkms-${PACKAGE_VERSION}-1.el8.noarch.rpm "${PACKAGE_REGISTRY_URL}/jfjoch-driver-dkms.el8.noarch.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-writer-${PACKAGE_VERSION}-1.el8.x86_64.rpm "${PACKAGE_REGISTRY_URL}/jfjoch-writer-${PACKAGE_VERSION}-1.el8.x86_64.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-writer-${PACKAGE_VERSION}-1.el8.x86_64.rpm "${PACKAGE_REGISTRY_URL}/jfjoch-writer.el8.x86_64.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-${PACKAGE_VERSION}-1.el8.x86_64.rpm "${PACKAGE_REGISTRY_URL}/jfjoch-${PACKAGE_VERSION}-1.el8.x86_64.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch-${PACKAGE_VERSION}-1.el8.x86_64.rpm "${PACKAGE_REGISTRY_URL}/jfjoch.el8.x86_64.rpm"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch_driver.tar.gz "${PACKAGE_REGISTRY_URL}/jfjoch_driver.tar.gz"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch_frontend.tar.gz "${PACKAGE_REGISTRY_URL}/jfjoch_frontend.tar.gz"'
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file jfjoch_fpga_pcie_100g.mcs "${PACKAGE_REGISTRY_URL}/jfjoch_fpga_pcie_100g.mcs"'
@@ -305,3 +331,9 @@ release:
--assets-link "{\"name\":\"jfjoch_frontend.tar.gz\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch_frontend.tar.gz\"}"
--assets-link "{\"name\":\"jfjoch_fpga_pcie_8x10g.mcs\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch_fpga_pcie_8x10g.mcs\"}"
--assets-link "{\"name\":\"jfjoch_fpga_pcie_100g.mcs\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch_fpga_pcie_100g.mcs\"}"
--assets-link "{\"name\":\"jfjoch-${PACKAGE_VERSION}-1.el8.x86_64.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch-${PACKAGE_VERSION}-1.el8.x86_64.rpm\"}"
--assets-link "{\"name\":\"jfjoch-writer-${PACKAGE_VERSION}-1.el8.x86_64.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch-writer-${PACKAGE_VERSION}-1.el8.x86_64.rpm\"}"
--assets-link "{\"name\":\"jfjoch-driver-dkms-${PACKAGE_VERSION}-1.el8.noarch.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch-driver-dkms-${PACKAGE_VERSION}-1.el8.noarch.rpm\"}"
--assets-link "{\"name\":\"jfjoch.el8.x86_64.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch.el8.x86_64.rpm\"}"
--assets-link "{\"name\":\"jfjoch-writer.el8.x86_64.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch-writer.el8.x86_64.rpm\"}"
--assets-link "{\"name\":\"jfjoch-driver-dkms.el8.noarch.rpm\",\"url\":\"${PACKAGE_REGISTRY_URL}/jfjoch-driver-dkms.el8.noarch.rpm\"}"

View File

@@ -1,7 +1,9 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.19)
PROJECT(Jungfraujoch VERSION 1.0 LANGUAGES C CXX)
LIST(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
FILE(STRINGS VERSION JFJOCH_VERSION)
PROJECT(jfjoch VERSION 1.0.0 LANGUAGES C CXX)
SET(CMAKE_POLICY_DEFAULT_CMP0077 NEW)
SET(CMAKE_CXX_STANDARD 20)
SET(CMAKE_CXX_STANDARD_REQUIRED True)
@@ -9,8 +11,31 @@ SET(CMAKE_CXX_STANDARD_REQUIRED True)
SET(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native -mtune=native -Wno-deprecated-enum-enum-conversion")
SET(CMAKE_C_FLAGS_RELEASE "-O3 -march=native -mtune=native")
SET(BUILD_SHARED_LIBS OFF)
SET(BUILD_TESTING OFF)
SET(ZSTD_LEGACY_SUPPORT OFF)
SET(ZSTD_MULTITHREAD_SUPPORT OFF)
SET(ZSTD_BUILD_PROGRAMS OFF)
SET(ZSTD_BUILD_SHARED OFF)
SET(SLS_USE_RECEIVER OFF)
SET(SLS_USE_RECEIVER_BINARIES OFF)
SET(SLS_BUILD_SHARED_LIBRARIES OFF)
SET(BUILD_FAST_INDEXER OFF)
SET(BUILD_FAST_INDEXER_STATIC ON)
SET(HDF5_ENABLE_SZIP_SUPPORT OFF)
SET(HDF5_ENABLE_SZIP_ENCODING OFF)
SET(HDF5_BUILD_EXAMPLES OFF)
SET(HDF5_BUILD_CPP_LIB OFF)
SET(HDF5_ENABLE_Z_LIB_SUPPORT OFF)
SET(HDF5_EXTERNALLY_CONFIGURED 1)
SET(jbig OFF)
SET(zstd OFF)
SET(lzma OFF)
INCLUDE(CheckLanguage)
CHECK_LANGUAGE(CUDA)
@@ -37,10 +62,22 @@ CHECK_INCLUDE_FILE(numa.h HAS_NUMA_H)
include(FetchContent)
FetchContent_Declare(tiff
GIT_REPOSITORY https://github.com/fleon-psi/libtiff
GIT_TAG v4.6.0
EXCLUDE_FROM_ALL)
FetchContent_Declare(hdf5
GIT_REPOSITORY https://github.com/HDFGroup/hdf5/
GIT_TAG hdf5_1.14.4.2
GIT_SHALLOW 1
EXCLUDE_FROM_ALL)
FetchContent_Declare(
pistache_http
GIT_REPOSITORY https://github.com/fleon-psi/pistache
GIT_TAG 51553b92cc7bb25ac792462722ddd4fae33d14b1
EXCLUDE_FROM_ALL
)
FetchContent_Declare(
@@ -48,6 +85,7 @@ FetchContent_Declare(
GIT_REPOSITORY https://github.com/facebook/zstd
GIT_TAG 794ea1b0afca0f020f4e57b6732332231fb23c70
SOURCE_SUBDIR build/cmake
EXCLUDE_FROM_ALL
)
FetchContent_Declare(
@@ -60,9 +98,10 @@ FetchContent_Declare(
catch2
GIT_REPOSITORY https://github.com/catchorg/Catch2
GIT_TAG 4e8d92b
EXCLUDE_FROM_ALL
)
FetchContent_MakeAvailable(pistache_http zstd sls_detector_package catch2)
FetchContent_MakeAvailable(pistache_http zstd sls_detector_package catch2 hdf5 tiff)
ADD_SUBDIRECTORY(jungfrau)
ADD_SUBDIRECTORY(compression)
@@ -73,7 +112,6 @@ ADD_SUBDIRECTORY(frame_serialize)
ADD_SUBDIRECTORY(detector_control)
IF (JFJOCH_WRITER_ONLY)
MESSAGE(STATUS "Compiling HDF5 writer only")
SET(jfjoch_executables jfjoch_writer)
ELSE()
ADD_SUBDIRECTORY(broker)
ADD_SUBDIRECTORY(fpga)
@@ -83,16 +121,51 @@ ELSE()
ADD_SUBDIRECTORY(tests)
ADD_SUBDIRECTORY(tools)
ADD_SUBDIRECTORY(preview)
SET(jfjoch_executables jfjoch_broker jfjoch_writer jfjoch_test CompressionBenchmark HDF5DatasetWriteTest jfjoch_udp_simulator sls_detector_put sls_detector_get)
ENDIF()
ADD_CUSTOM_COMMAND(OUTPUT frontend_ui/build/index.html
COMMAND npm run build
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/frontend_ui)
ADD_CUSTOM_TARGET(frontend DEPENDS frontend_ui/build/index.html)
IF (NOT JFJOCH_WRITER_ONLY)
ADD_CUSTOM_COMMAND(OUTPUT frontend_ui/build/index.html
COMMAND npm install
COMMAND npm run build
COMMAND npm run redocly
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/frontend_ui)
ADD_CUSTOM_TARGET(frontend DEPENDS frontend_ui/build/index.html)
ADD_CUSTOM_TARGET(jfjoch DEPENDS ${jfjoch_executables})
INSTALL(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/fpga/pcie_driver/
DESTINATION /usr/src/jfjoch-1.0.0
COMPONENT driver-dkms
FILES_MATCHING PATTERN "*.c" PATTERN "*.h" PATTERN "Makefile" PATTERN "dkms.conf")
FILE(MAKE_DIRECTORY ${CMAKE_SOURCE_DIR}/frontend_ui/build/)
INSTALL(DIRECTORY ${CMAKE_SOURCE_DIR}/frontend_ui/build/ DESTINATION share/jfjoch/frontend COMPONENT jfjoch )
ENDIF()
IF(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
SET(CMAKE_INSTALL_PREFIX /opt/jfjoch CACHE PATH "Default directory" FORCE)
ENDIF(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
# Set Package Name
set(CPACK_PACKAGE_NAME "jfjoch")
SET(CPACK_COMPONENTS_ALL jfjoch writer driver-dkms)
SET(CPACK_GENERATOR RPM)
SET(CPACK_RPM_COMPONENT_INSTALL ON)
SET(CPACK_RPM_MAIN_COMPONENT jfjoch)
SET(CPACK_RPM_PACKAGE_RELEASE_DIST ON)
SET(CPACK_RPM_FILE_NAME "RPM-DEFAULT")
SET(CPACK_RPM_PACKAGE_VERSION ${JFJOCH_VERSION})
SET(CPACK_RPM_PACKAGE_RELEASE 1)
SET(CPACK_RPM_PACKAGE_SUMMARY "Jungfraujoch data acquisition system")
SET(CPACK_RPM_PACKAGE_DESCRIPTION "Jungfraujoch")
SET(CPACK_RPM_DRIVER-DKMS_PACKAGE_REQUIRES "dkms, gcc, bash, sed")
SET(CPACK_RPM_DRIVER-DKMS_PACKAGE_ARCHITECTURE "noarch")
SET(CPACK_RPM_DRIVER-DKMS_POST_INSTALL_SCRIPT_FILE ${CMAKE_CURRENT_SOURCE_DIR}/fpga/pcie_driver/postinstall.sh)
SET(CPACK_RPM_DRIVER-DKMS_PRE_UNINSTALL_SCRIPT_FILE ${CMAKE_CURRENT_SOURCE_DIR}/fpga/pcie_driver/preuninstall.sh)
# Set The Vendor Name
SET(CPACK_PACKAGE_VENDOR "Paul Scherrer Institut")
# Set The License Information
SET(CPACK_RPM_PACKAGE_LICENSE "Proprietary")
INCLUDE(CPack)

View File

@@ -4,7 +4,7 @@ FROM harbor.maxiv.lu.se/dockerhub/library/ubuntu:22.04
RUN set -ex; \
apt-get update; \
apt-get install -y pkg-config git cmake make g++ libhdf5-dev libczmq-dev;\
apt-get install -y pkg-config git cmake make g++;\
rm -rf /var/lib/apt/lists/*

View File

@@ -33,9 +33,7 @@ Other linux platforms should work, but no tests were done so far.
### Dependencies
Required:
* C++20 compiler and C++20 standard library; recommended GCC 11+ or clang 14+ (Intel OneAPI, AMD AOCC)
* CMake version 3.21 or newer + GNU make tool
* HDF5 library version 1.10 or newer
* TIFF library (with C++ headers)
* CMake version 3.21 or newer + GNU make tool
* JPEG library (turbo-jpeg is also OK)
Optional:
@@ -43,14 +41,16 @@ Optional:
* NUMA library - to pin threads to nodes/CPUs
* Node.js - to make frontend
Automatically downloaded by CMake:
Automatically downloaded by CMake and statically linked:
* SLS Detector Package - see [github.com/slsdetectorgroup/slsDetectorPackage](https://github.com/slsdetectorgroup/slsDetectorPackage)
* Zstandard (Facebook) - see [github.com/facebook/zstd](https://github.com/facebook/zstd)
* Pistache webserver - see [github.com/pistacheio/pistache](https://github.com/pistacheio/pistache)
* Fast feedback indexer (Hans-Christian Stadler, PSI) - see [github.com/paulscherrerinstitute/fast-feedback-indexer](https://github.com/paulscherrerinstitute/fast-feedback-indexer)
* Catch2 testing library - see [github.com/catchorg/Catch2](https://github.com/catchorg/Catch2)
* HDF5 library - see [github.com/HDFGroup/hdf5](https://github.com/HDFGroup/hdf5)
* TIFF library - see [gitlab.com/libtiff/libtiff](https://gitlab.com/libtiff/libtiff)
Please follow the link provided above to check for LICENSE file
Please follow the link provided above to check for LICENSE file. Building code with dependencies above requires access from the build system to github.com.
Directly included in the repository:
* JSON parser/writer from N. Lohmann - see [github.com/nlohmann/json](https://github.com/nlohmann/json)

View File

@@ -1 +1 @@
1.0.0-rc.1
1.0.0_rc.6

View File

@@ -15,4 +15,6 @@ TARGET_LINK_LIBRARIES(JFJochBroker JFJochReceiver JFJochDetector JFJochCommon JF
ADD_EXECUTABLE(jfjoch_broker jfjoch_broker.cpp)
TARGET_LINK_LIBRARIES(jfjoch_broker JFJochBroker)
INSTALL(TARGETS jfjoch_broker RUNTIME)
INSTALL(TARGETS jfjoch_broker RUNTIME COMPONENT jfjoch)
INSTALL(FILES redoc-static.html DESTINATION jfjoch/frontend COMPONENT jfjoch )

View File

@@ -67,6 +67,7 @@ inline org::openapitools::server::model::Measurement_statistics Convert(const Me
if (input.bkg_estimate)
ret.setBkgEstimate(input.bkg_estimate.value());
ret.setUnitCell(input.unit_cell);
return ret;
}

View File

@@ -402,6 +402,8 @@ void JFJochStateMachine::SetFullMeasurementOutput(const JFJochServicesOutput &ou
tmp.detector_height = experiment.GetYPixelsNum();
tmp.detector_pixel_depth = experiment.GetPixelDepth();
tmp.images_expected = experiment.GetImageNum();
tmp.unit_cell = experiment.GetUnitCellString();
tmp.compression_ratio = output.receiver_output.status.compressed_ratio;
tmp.collection_efficiency = output.receiver_output.efficiency;
@@ -426,6 +428,8 @@ void JFJochStateMachine::ClearAndSetMeasurementStatistics() {
tmp.detector_width = experiment.GetYPixelsNum();
tmp.detector_pixel_depth = experiment.GetPixelDepth();
tmp.images_expected = experiment.GetImageNum();
tmp.unit_cell = experiment.GetUnitCellString();
measurement_statistics = tmp;
}
@@ -445,6 +449,7 @@ std::optional<MeasurementStatistics> JFJochStateMachine::GetMeasurementStatistic
tmp.detector_height = experiment.GetYPixelsNum();
tmp.detector_pixel_depth = experiment.GetPixelDepth();
tmp.images_expected = experiment.GetImageNum();
tmp.unit_cell = experiment.GetUnitCellString();
tmp.compression_ratio = rcv_status->compressed_ratio;
tmp.images_collected = rcv_status->images_collected;

View File

@@ -56,6 +56,8 @@ struct MeasurementStatistics {
std::optional<float> bkg_estimate;
std::optional<std::pair<float, float>> beam_center_drift_pxl;
std::string unit_cell;
};
struct DetectorSettings {

View File

@@ -51,6 +51,8 @@ Measurement_statistics::Measurement_statistics()
m_Detector_pixel_depthIsSet = false;
m_Bkg_estimate = 0.0f;
m_Bkg_estimateIsSet = false;
m_Unit_cell = "";
m_Unit_cellIsSet = false;
}
@@ -106,7 +108,7 @@ bool Measurement_statistics::validate(std::stringstream& msg, const std::string&
}
}
return success;
}
@@ -158,7 +160,10 @@ bool Measurement_statistics::operator==(const Measurement_statistics& rhs) const
((!detectorPixelDepthIsSet() && !rhs.detectorPixelDepthIsSet()) || (detectorPixelDepthIsSet() && rhs.detectorPixelDepthIsSet() && getDetectorPixelDepth() == rhs.getDetectorPixelDepth())) &&
((!bkgEstimateIsSet() && !rhs.bkgEstimateIsSet()) || (bkgEstimateIsSet() && rhs.bkgEstimateIsSet() && getBkgEstimate() == rhs.getBkgEstimate()))
((!bkgEstimateIsSet() && !rhs.bkgEstimateIsSet()) || (bkgEstimateIsSet() && rhs.bkgEstimateIsSet() && getBkgEstimate() == rhs.getBkgEstimate())) &&
((!unitCellIsSet() && !rhs.unitCellIsSet()) || (unitCellIsSet() && rhs.unitCellIsSet() && getUnitCell() == rhs.getUnitCell()))
;
}
@@ -201,6 +206,8 @@ void to_json(nlohmann::json& j, const Measurement_statistics& o)
j["detector_pixel_depth"] = o.m_Detector_pixel_depth;
if(o.bkgEstimateIsSet())
j["bkg_estimate"] = o.m_Bkg_estimate;
if(o.unitCellIsSet())
j["unit_cell"] = o.m_Unit_cell;
}
@@ -281,6 +288,11 @@ void from_json(const nlohmann::json& j, Measurement_statistics& o)
j.at("bkg_estimate").get_to(o.m_Bkg_estimate);
o.m_Bkg_estimateIsSet = true;
}
if(j.find("unit_cell") != j.end())
{
j.at("unit_cell").get_to(o.m_Unit_cell);
o.m_Unit_cellIsSet = true;
}
}
@@ -539,6 +551,23 @@ void Measurement_statistics::unsetBkg_estimate()
{
m_Bkg_estimateIsSet = false;
}
std::string Measurement_statistics::getUnitCell() const
{
return m_Unit_cell;
}
void Measurement_statistics::setUnitCell(std::string const& value)
{
m_Unit_cell = value;
m_Unit_cellIsSet = true;
}
bool Measurement_statistics::unitCellIsSet() const
{
return m_Unit_cellIsSet;
}
void Measurement_statistics::unsetUnit_cell()
{
m_Unit_cellIsSet = false;
}
} // namespace org::openapitools::server::model

View File

@@ -163,6 +163,13 @@ public:
void setBkgEstimate(float const value);
bool bkgEstimateIsSet() const;
void unsetBkg_estimate();
/// <summary>
///
/// </summary>
std::string getUnitCell() const;
void setUnitCell(std::string const& value);
bool unitCellIsSet() const;
void unsetUnit_cell();
friend void to_json(nlohmann::json& j, const Measurement_statistics& o);
friend void from_json(const nlohmann::json& j, Measurement_statistics& o);
@@ -197,6 +204,8 @@ protected:
bool m_Detector_pixel_depthIsSet;
float m_Bkg_estimate;
bool m_Bkg_estimateIsSet;
std::string m_Unit_cell;
bool m_Unit_cellIsSet;
};

View File

@@ -500,6 +500,8 @@ components:
bkg_estimate:
type: number
format: float
unit_cell:
type: string
broker_status:
type: object
required:

View File

@@ -2,7 +2,7 @@
// Using OpenAPI licensed with Apache License 2.0
#include <vector>
#include <signal.h>
#include <csignal>
#include <fstream>
#include <nlohmann/json.hpp>
@@ -12,7 +12,7 @@
#include "JFJochBrokerHttp.h"
#include "JFJochBrokerParser.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../frame_serialize/DumpCBORToFilePusher.h"
static Pistache::Http::Endpoint *httpEndpoint;
@@ -73,8 +73,6 @@ int main (int argc, char **argv) {
std::unique_ptr<JFJochReceiverService> receiver;
std::unique_ptr<ImagePusher> image_pusher;
ZMQContext context;
DiffractionExperiment experiment;
experiment.MaskChipEdges(true).MaskModuleEdges(true);
@@ -88,9 +86,15 @@ int main (int argc, char **argv) {
int32_t zmq_send_watermark = ParseInt32(input, "zmq_send_watermark", 100);
int32_t zmq_send_buffer_size = ParseInt32(input, "zmq_send_buffer_size", -1);
image_pusher = std::make_unique<ZMQStream2PusherGroup>(ParseStringArray(input, "zmq_image_addr"),
auto tmp = std::make_unique<ZMQStream2Pusher>(ParseStringArray(input, "zmq_image_addr"),
zmq_send_watermark,
zmq_send_buffer_size);
std::string preview_addr = ParseString(input, "zmq_preview_addr", "");
if (!preview_addr.empty())
tmp->PreviewSocket(preview_addr);
image_pusher = std::move(tmp);
} else if (pusher_type == "dump_cbor") {
image_pusher = std::make_unique<DumpCBORToFilePusher>();
} else

File diff suppressed because one or more lines are too long

View File

@@ -23,14 +23,19 @@ MESSAGE(STATUS "Jungfraujoch version: ${PACKAGE_VERSION}")
CONFIGURE_FILE("${CMAKE_CURRENT_SOURCE_DIR}/GitInfo.cpp.in" "${CMAKE_CURRENT_BINARY_DIR}/GitInfo.cpp" @ONLY)
ADD_LIBRARY( JFJochCommon STATIC
ADD_LIBRARY(JFJochLogger STATIC
Logger.cpp Logger.h
${CMAKE_CURRENT_BINARY_DIR}/GitInfo.cpp GitInfo.h
)
ADD_LIBRARY(JFJochZMQ STATIC ZMQWrappers.cpp ZMQWrappers.h)
ADD_LIBRARY(JFJochCommon STATIC
Coord.cpp Coord.h
DiffractionExperiment.cpp DiffractionExperiment.h
RawToConvertedGeometry.h
JFJochException.h
Definitions.h
${CMAKE_CURRENT_BINARY_DIR}/GitInfo.cpp GitInfo.h
ThreadSafeFIFO.h
DiffractionSpot.cpp DiffractionSpot.h
StatusVector.h
@@ -45,7 +50,6 @@ ADD_LIBRARY( JFJochCommon STATIC
RawToConvertedGeometryCore.h
Plot.h
../fpga/pcie_driver/jfjoch_fpga.h
ZMQWrappers.cpp ZMQWrappers.h
DatasetSettings.cpp DatasetSettings.h
ROIMap.cpp ROIMap.h
ROIElement.cpp ROIElement.h
@@ -57,7 +61,9 @@ ADD_LIBRARY( JFJochCommon STATIC
PixelMask.cpp PixelMask.h
)
TARGET_LINK_LIBRARIES(JFJochCommon Compression JFCalibration "$<BUILD_INTERFACE:libzmq-static>" -lrt)
TARGET_LINK_LIBRARIES(JFJochCommon JFJochLogger Compression JFCalibration -lrt)
TARGET_LINK_LIBRARIES(JFJochZMQ "$<BUILD_INTERFACE:libzmq-static>")
IF (CMAKE_CUDA_COMPILER)
TARGET_SOURCES(JFJochCommon PRIVATE CUDAWrapper.cu )

View File

@@ -9,6 +9,7 @@
#include "JFJochException.h"
#include "RawToConvertedGeometry.h"
#include "../fpga/pcie_driver/jfjoch_fpga.h"
#include "../include/spdlog/fmt/fmt.h"
using namespace std::literals::chrono_literals;
@@ -597,6 +598,20 @@ std::optional<UnitCell> DiffractionExperiment::GetUnitCell() const {
return dataset.GetUnitCell();
}
std::string DiffractionExperiment::GetUnitCellString() const {
auto uc = dataset.GetUnitCell();
if (uc.has_value()) {
return fmt::format("{:.1f}, {:.1f}, {:.1f}, {:.1f}, {:.1f} {:.1f}",
uc.value().a,
uc.value().b,
uc.value().c,
uc.value().alpha,
uc.value().beta,
uc.value().gamma);
} else
return "-";
}
Coord DiffractionExperiment::LabCoord(float detector_x, float detector_y) const {
// Assumes planar detector, 90 deg towards beam
return {(detector_x - GetBeamX_pxl()) * GetPixelSize_mm() ,
@@ -737,6 +752,8 @@ void DiffractionExperiment::FillMessage(StartMessage &message) const {
for (const auto &[x, y]: roi_mask.GetROINameMap())
message.roi_names.emplace_back(x);
message.data_reduction_factor_serialmx = GetDataReductionFactorSerialMX();
}
float DiffractionExperiment::GetPixelSize_mm() const {

View File

@@ -295,6 +295,7 @@ public:
std::string GetImageAppendix() const;
float GetPhotonEnergyMultiplier() const;
std::optional<UnitCell> GetUnitCell() const;
std::string GetUnitCellString() const;
int64_t GetSpaceGroupNumber() const;
bool GetSaveCalibration() const;
int64_t GetSummation() const;

View File

@@ -46,15 +46,6 @@ NUMAHWPolicy::NUMAHWPolicy(const std::string &policy) : name(policy) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Unknown NUMA policy");
}
NUMAHWPolicy::NUMAHWPolicy(const NUMAHWPolicy &other) : bindings(other.bindings), name(other.name), curr_thread(0) {}
NUMAHWPolicy &NUMAHWPolicy::operator=(const NUMAHWPolicy &other) {
bindings = other.bindings;
name = other.name;
curr_thread = 0;
return *this;
}
NUMABinding NUMAHWPolicy::GetBinding(uint32_t thread) {
if (bindings.empty())
return NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = -1};
@@ -62,14 +53,6 @@ NUMABinding NUMAHWPolicy::GetBinding(uint32_t thread) {
return bindings.at(thread % bindings.size());
}
NUMABinding NUMAHWPolicy::GetBinding() {
return GetBinding(curr_thread++);
}
void NUMAHWPolicy::Bind() {
Bind(GetBinding());
}
void NUMAHWPolicy::Bind(uint32_t thread) {
Bind(GetBinding(thread));
}

View File

@@ -17,19 +17,14 @@ struct NUMABinding {
class NUMAHWPolicy {
std::string name;
std::vector<NUMABinding> bindings;
std::atomic<uint32_t> curr_thread = 0;
public:
NUMAHWPolicy() = default;
explicit NUMAHWPolicy(const std::string& policy);
NUMAHWPolicy(const NUMAHWPolicy& other);
NUMAHWPolicy& operator=(const NUMAHWPolicy& other);
NUMABinding GetBinding(uint32_t thread);
NUMABinding GetBinding(); // round-robin
const std::string &GetName() const;
void Bind(uint32_t thread);
void Bind(); // round-robin
static void Bind(const NUMABinding &binding);
static void RunOnNode(int32_t cpu_node);
static void MemOnNode(int32_t mem_node);

View File

@@ -8,8 +8,7 @@ ZMQContext::ZMQContext() {
// Default is to have 2 I/O threads per ZMQ context
if (zmq_ctx_set(context, ZMQ_IO_THREADS, 2) != 0)
throw JFJochException(JFJochExceptionCategory::ZeroMQ,
"Cannot set number of I/O threads");
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads");
}
ZMQContext &ZMQContext::NumThreads(int32_t threads) {
@@ -27,7 +26,7 @@ void *ZMQContext::GetContext() const {
return context;
}
ZMQSocket::ZMQSocket(ZMQContext &context, ZMQSocketType in_socket_type) : socket_type(in_socket_type) {
ZMQSocket::ZMQSocket(ZMQSocketType in_socket_type) : socket_type(in_socket_type) {
socket = zmq_socket(context.GetContext(), static_cast<int>(socket_type));
if (socket == nullptr)

View File

@@ -42,14 +42,15 @@ public:
class ZMQSocket {
std::mutex m;
ZMQContext context;
ZMQSocketType socket_type;
void *socket;
void SetSocketOption(int32_t option_name, int32_t value);
public:
ZMQSocket(ZMQSocket &socket) = delete;
const ZMQSocket& operator=(ZMQSocket &socket) = delete;
ZMQSocket(ZMQContext &context, ZMQSocketType socket_type);
~ZMQSocket();
explicit ZMQSocket(ZMQSocketType socket_type);
~ZMQSocket();
void Connect(const std::string& addr);
void Disconnect(const std::string& addr);
void Bind(const std::string& addr);

View File

@@ -1,6 +1,2 @@
INSTALL(TARGETS sls_detector_put sls_detector_get RUNTIME)
ADD_LIBRARY(JFJochDetector STATIC DetectorWrapper.cpp DetectorWrapper.h)
TARGET_LINK_LIBRARIES(JFJochDetector JFJochCommon slsSupportShared slsDetectorShared)
TARGET_LINK_LIBRARIES(JFJochDetector JFJochCommon slsDetectorStatic slsSupportStatic)

View File

@@ -4,7 +4,7 @@ TARGET_LINK_LIBRARIES(JFJochDevice JFJochCommon)
ADD_EXECUTABLE(jfjoch_pcie_status jfjoch_pcie_status.cpp)
TARGET_LINK_LIBRARIES(jfjoch_pcie_status JFJochDevice )
INSTALL(TARGETS jfjoch_pcie_status RUNTIME)
INSTALL(TARGETS jfjoch_pcie_status RUNTIME COMPONENT jfjoch)
ADD_EXECUTABLE(jfjoch_pcie_set_network jfjoch_pcie_set_network.cpp)
TARGET_LINK_LIBRARIES(jfjoch_pcie_set_network JFJochDevice )

View File

@@ -1,5 +1,5 @@
PACKAGE_NAME=jfjoch
PACKAGE_VERSION=0.1
PACKAGE_VERSION=1.0.0
DEST_MODULE_LOCATION=/extra
BUILT_MODULE_NAME=jfjoch
@@ -7,4 +7,4 @@ BUILT_MODULE_LOCATION=src/
MAKE="'make' -C src/ all"
CLEAN="'make' -C src/ clean"
AUTOINSTALL="yes"
AUTOINSTALL="yes"

View File

@@ -1,12 +1,11 @@
#!/bin/bash
# Copyright (2019-2023) Paul Scherrer Institute
VERSION=0.1
VERSION=1.0.0
mkdir -p /usr/src/jfjoch-${VERSION}/src
cp dkms.conf /usr/src/jfjoch-${VERSION}
cp *.c *.h Makefile ../../common/Definitions.h /usr/src/jfjoch-0.1/src
sed -i "s,../../common/Definitions.h,Definitions.h," /usr/src/jfjoch-0.1/src/jfjoch_drv.h
cp *.c *.h Makefile /usr/src/jfjoch-${VERSION}/src
dkms add -m jfjoch -v ${VERSION}
dkms install -m jfjoch -v ${VERSION}

View File

@@ -11,7 +11,7 @@
MODULE_AUTHOR("Filip Leonarski; Paul Scherrer Institute");
MODULE_DESCRIPTION("Jungfraujoch device module");
MODULE_LICENSE("GPL");
MODULE_VERSION("0.1");
MODULE_VERSION("1.0.0");
#define XDMA_GEN4_x8 (0x9048)
#define XDMA_GEN3_x16 (0x903F)

View File

@@ -0,0 +1,14 @@
#!/bin/bash
# Taken from https://schneide.blog/2015/08/10/packaging-kernel-modulesdrivers-using-dkms/
VERSION="1.0.0"
occurrences=`/usr/sbin/dkms status | grep jfjoch | grep ${VERSION} | wc -l`
if [ ! occurrences > 0 ]; then
/usr/sbin/dkms add -m jfjoch -v ${VERSION}
fi
/usr/sbin/dkms build -m jfjoch -v ${VERSION}
/usr/sbin/dkms install -m jfjoch -v ${VERSION}
exit 0

View File

@@ -0,0 +1,8 @@
#!/bin/bash
# Taken from https://schneide.blog/2015/08/10/packaging-kernel-modulesdrivers-using-dkms/
VERSION="1.0.0"
/usr/sbin/dkms remove -m jfjoch -v ${VERSION} --all
exit 0

View File

@@ -636,6 +636,8 @@ namespace {
message.roi_names = j["roi_names"];
if (j.contains("write_master_file"))
message.write_master_file = j["write_master_file"];
if (j.contains("data_reduction_factor_serialmx"))
message.data_reduction_factor_serialmx = j["data_reduction_factor_serialmx"];
} catch (const std::exception &e) {
throw JFJochException(JFJochExceptionCategory::CBORError,
"Cannot parse user_data as valid JSON " + std::string(e.what()));

View File

@@ -316,6 +316,8 @@ inline void CBOR_ENC_START_USER_DATA(CborEncoder& encoder, const char* key,
j["gain_file_names"] = message.gain_file_names;
if (message.write_master_file)
j["write_master_file"] = message.write_master_file.value();
if (message.data_reduction_factor_serialmx)
j["data_reduction_factor_serialmx"] = message.data_reduction_factor_serialmx.value();
auto str = j.dump();

View File

@@ -20,10 +20,8 @@ TARGET_LINK_LIBRARIES(CBORStream2FrameSerialize tinycbor)
ADD_LIBRARY(ImagePusher STATIC
ImagePusher.cpp ImagePusher.h
TestImagePusher.cpp TestImagePusher.h
ZMQStream2PusherGroup.cpp ZMQStream2PusherGroup.h
ZMQStream2Pusher.cpp
ZMQStream2Pusher.h
ZMQStream2Pusher.cpp ZMQStream2Pusher.h
DumpCBORToFilePusher.cpp
DumpCBORToFilePusher.h)
TARGET_LINK_LIBRARIES(ImagePusher CBORStream2FrameSerialize JFJochCommon Compression)
TARGET_LINK_LIBRARIES(ImagePusher JFJochZMQ CBORStream2FrameSerialize JFJochCommon Compression)

View File

@@ -168,6 +168,8 @@ struct StartMessage {
}
std::string user_data;
std::optional<float> data_reduction_factor_serialmx;
};
struct EndMessage {

View File

@@ -1,66 +1,117 @@
// Copyright (2019-2024) Paul Scherrer Institute
#include "ZMQStream2Pusher.h"
#include "CBORStream2Serializer.h"
ZMQStream2Pusher::ZMQStream2Pusher(ZMQContext &context, const std::string &addr, int32_t send_buffer_high_watermark,
int32_t send_buffer_size)
: socket(context, ZMQSocketType::Push) {
Bind(addr, send_buffer_high_watermark, send_buffer_size);
}
ZMQStream2Pusher::ZMQStream2Pusher(const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size)
: serialization_buffer(256*1024*1024),
serializer(serialization_buffer.data(), serialization_buffer.size()),
preview_counter(std::chrono::seconds(1)) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided");
ZMQStream2Pusher::ZMQStream2Pusher(const std::string &addr, int32_t send_buffer_high_watermark,
int32_t send_buffer_size)
: context(std::make_unique<ZMQContext>()),
socket(*context, ZMQSocketType::Push) {
Bind(addr, send_buffer_high_watermark, send_buffer_size);
}
void ZMQStream2Pusher::Bind(const std::string &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (send_buffer_size > 0)
socket.SendBufferSize(send_buffer_size);
if (send_buffer_high_watermark > 0)
socket.SendWaterMark(send_buffer_high_watermark);
socket.SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response
socket.Bind(addr);
}
void ZMQStream2Pusher::StartDataCollection(StartMessage &message) {
size_t approx_size = 1024*1024;
for (const auto &x : message.pixel_mask)
approx_size += x.size;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(message);
if (!socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr " + GetAddress());
for (const auto &a : addr) {
auto s = std::make_unique<ZMQSocket>(ZMQSocketType::Push);
if (send_buffer_size > 0)
s->SendBufferSize(send_buffer_size);
if (send_buffer_high_watermark > 0)
s->SendWaterMark(send_buffer_high_watermark);
s->SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response
s->Bind(a);
socket.emplace_back(std::move(s));
}
}
bool ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
return socket.Send(image_data, image_size, false);
if (preview_socket) {
if (preview_counter.GeneratePreview())
preview_socket->Send(image_data, image_size, false);
}
if (!socket.empty()) {
auto socket_number = (image_number / images_per_file) % socket.size();
return socket[socket_number]->Send(image_data, image_size, false);
} else
return false;
}
void ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
socket.SendZeroCopy(image_data,image_size, z);
ZeroCopyReturnValue *z) {
if (preview_socket) {
if (preview_counter.GeneratePreview())
preview_socket->Send(image_data, image_size, false);
}
if (!socket.empty()) {
auto socket_number = (image_number / images_per_file) % socket.size();
socket[socket_number]->SendZeroCopy(image_data, image_size, z);
} else
z->release();
}
bool ZMQStream2Pusher::EndDataCollection(const EndMessage &message) {
std::vector<uint8_t> serialization_buffer(80 * 1024 * 1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
void ZMQStream2Pusher::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
serializer.SerializeSequenceEnd(message);
return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking
serializer.SerializeSequenceStart(message);
for (auto &s: socket) {
if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr "
+ s->GetEndpointName());
if (message.write_master_file) {
message.write_master_file = false;
serializer.SerializeSequenceStart(message);
}
}
if (preview_socket)
preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
}
bool ZMQStream2Pusher::SendCalibration(const CompressedImage &message) {
std::vector<uint8_t> serialization_buffer(80 * 1024 * 1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
if (socket.empty())
return false;
serializer.SerializeCalibration(message);
return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking
return socket[0]->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
}
std::string ZMQStream2Pusher::GetAddress() {
return socket.GetEndpointName();
bool ZMQStream2Pusher::EndDataCollection(const EndMessage& message) {
serializer.SerializeSequenceEnd(message);
bool ret = true;
for (auto &s: socket) {
if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
ret = false;
}
if (preview_socket)
preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
return ret;
}
std::vector<std::string> ZMQStream2Pusher::GetAddress() {
std::vector<std::string> ret;
for (auto &p: socket)
ret.push_back(p->GetEndpointName());
return ret;
}
ZMQStream2Pusher &ZMQStream2Pusher::PreviewSocket(const std::string &addr) {
preview_socket = std::make_unique<ZMQSocket>(ZMQSocketType::Pub);
preview_socket->Bind(addr);
return *this;
}
std::string ZMQStream2Pusher::GetPreviewAddress() {
if (preview_socket)
return preview_socket->GetEndpointName();
else
return "";
}

View File

@@ -3,30 +3,40 @@
#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#define JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#include <mutex>
#include "ImagePusher.h"
#include "../common/ZMQWrappers.h"
#include "../preview/PreviewCounter.h"
class ZMQStream2Pusher : public ImagePusher {
std::unique_ptr<ZMQContext> context;
ZMQSocket socket;
public:
ZMQStream2Pusher(ZMQContext& context,
const std::string& addr,
int32_t send_buffer_high_watermark = -1,
int32_t send_buffer_size = -1);
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
explicit ZMQStream2Pusher(const std::string& addr,
std::vector<std::unique_ptr<ZMQSocket>> socket;
std::unique_ptr<ZMQSocket> preview_socket;
PreviewCounter preview_counter;
int64_t images_per_file = 1;
public:
explicit ZMQStream2Pusher(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1,
int32_t send_buffer_size = -1);
void Bind(const std::string& addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size);
ZMQStream2Pusher& PreviewSocket(const std::string& addr);
std::string GetPreviewAddress();
std::vector<std::string> GetAddress();
// Strictly serial, as order of these is important
void StartDataCollection(StartMessage& message) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool EndDataCollection(const EndMessage &message) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendCalibration(const CompressedImage& message) override;
std::string GetAddress();
// Thread-safe
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H

View File

@@ -1,75 +0,0 @@
// Copyright (2019-2024) Paul Scherrer Institute
#include "ZMQStream2PusherGroup.h"
#include "CBORStream2Serializer.h"
ZMQStream2PusherGroup::ZMQStream2PusherGroup(ZMQContext &zmq_context, const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"No writer ZMQ address provided");
for (const auto &a : addr)
pusher.emplace_back(std::make_unique<ZMQStream2Pusher>
(zmq_context, a, send_buffer_high_watermark, send_buffer_size));
}
ZMQStream2PusherGroup::ZMQStream2PusherGroup(const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"No writer ZMQ address provided");
for (const auto &a : addr)
pusher.emplace_back(std::make_unique<ZMQStream2Pusher>
(a, send_buffer_high_watermark, send_buffer_size));
}
bool ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
if (!pusher.empty()) {
auto socket_number = (image_number / images_per_file) % pusher.size();
return pusher[socket_number]->SendImage(image_data, image_size, image_number);
} else
return false;
}
void ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
if (!pusher.empty()) {
auto socket_number = (image_number / images_per_file) % pusher.size();
pusher[socket_number]->SendImage(image_data, image_size, image_number, z);
}
}
void ZMQStream2PusherGroup::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
for (auto &p: pusher) {
p->StartDataCollection(message);
message.write_master_file = false;
}
}
bool ZMQStream2PusherGroup::SendCalibration(const CompressedImage &message) {
if (pusher.empty())
return false;
return pusher[0]->SendCalibration(message);
}
bool ZMQStream2PusherGroup::EndDataCollection(const EndMessage& message) {
bool ret = true;
for (auto &p: pusher) {
if (!p->EndDataCollection(message))
ret = false;
}
return ret;
}
std::vector<std::string> ZMQStream2PusherGroup::GetAddress() {
std::vector<std::string> ret;
for (auto &p: pusher)
ret.push_back(p->GetAddress());
return ret;
}

View File

@@ -1,29 +0,0 @@
// Copyright (2019-2024) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H
#define JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H
#include "ImagePusher.h"
#include "ZMQStream2Pusher.h"
#include "../common/ZMQWrappers.h"
class ZMQStream2PusherGroup : public ImagePusher {
std::vector<std::unique_ptr<ZMQStream2Pusher>> pusher;
int64_t images_per_file = 1;
public:
ZMQStream2PusherGroup(ZMQContext &context, const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
// High performance implementation, where each socket has dedicated ZMQ context
explicit ZMQStream2PusherGroup(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
void StartDataCollection(StartMessage& message) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendCalibration(const CompressedImage& message) override;
std::vector<std::string> GetAddress();
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@
"@mui/icons-material": "^5.10.6",
"@mui/material": "^5.10.6",
"@mui/x-data-grid": "^5.17.14",
"@redocly/cli": "^1.12.2",
"@types/jest": "^29.2.4",
"@types/node": "^18.11.13",
"@types/react": "^18.0.26",
@@ -25,6 +26,7 @@
"scripts": {
"start": "REACT_APP_VERSION=$(git rev-parse --short HEAD) PORT=8000 react-scripts start",
"build": "REACT_APP_VERSION=$(git rev-parse --short HEAD) react-scripts build",
"redocly": "redocly build-docs ../broker/jfjoch_api.yaml --output=build/openapi.html",
"test": "react-scripts test",
"eject": "react-scripts eject",
"openapi": "openapi --input ../broker/jfjoch_api.yaml --output ./src/openapi"

View File

@@ -145,7 +145,8 @@ class App extends Component<MyProps, MyState> {
href="mailto:filip.leonarski@psi.ch">Filip Leonarski</a> <br/>
For more information see <a href="https://doi.org/10.1107/S1600577522010268"><i>J. Synchrotron
Rad.</i> (2023). <b>30</b>, 227234</a> <br/>
Build: {process.env.REACT_APP_VERSION}</center>
Build: {process.env.REACT_APP_VERSION}&nbsp;&nbsp;&nbsp;
<a href="/frontend/openapi.html">API reference</a></center>
<br/>
</ThemeProvider>
}

View File

@@ -9,7 +9,7 @@ type MyProps = {};
class BkgEstimatePlot extends Component<MyProps> {
render() {
return <Paper style={{textAlign: 'center'}} sx={{height: 450, width: "100%"}}>
return <Paper style={{textAlign: 'center'}} sx={{height: 500, width: "100%"}}>
<Box sx={{width: "100%", height: 50}}>
<br/>
<center><strong>Background estimate</strong></center>

View File

@@ -48,7 +48,7 @@ class MeasurementStatistics extends Component<MyProps, MyState> {
}
render() {
return <Paper style={{textAlign: 'center'}} sx={{ height: 450, width: '100%' }}>
return <Paper style={{textAlign: 'center'}} sx={{ height: 500, width: '100%' }}>
<Grid container spacing={0}>
<Grid item xs={1}/>
<Grid item xs={10}>
@@ -98,6 +98,10 @@ class MeasurementStatistics extends Component<MyProps, MyState> {
<TableCell align="right">{(this.state.s.bkg_estimate !== undefined)
? this.state.s.bkg_estimate.toPrecision(7) : "-"}</TableCell>
</TableRow>
<TableRow>
<TableCell component="th" scope="row"> Unit cell: </TableCell>
<TableCell align="right">{this.state.s.unit_cell}</TableCell>
</TableRow>
</TableBody>
</Table>
</TableContainer>

View File

@@ -28,7 +28,7 @@ class PreviewImage extends Component<MyProps, MyState> {
show_indexed: false,
resolution_ring: 0.5
},
s_url: "",
s_url: null,
update: true,
connection_error: true
}
@@ -124,9 +124,12 @@ class PreviewImage extends Component<MyProps, MyState> {
clearInterval(this.interval);
}
preview() {
return <div><br/>
<Stack spacing={2} direction="row" sx={{ mb: 1 }} alignItems="center">
render() {
return <Paper sx={{height: 1050, width: 850, m: 2}}
component={Stack}
direction="column">
<br/>
<Stack spacing={2} direction="row" sx={{mb: 1}} alignItems="center">
&nbsp;&nbsp;<strong>Preview image</strong>
<Switch disabled={this.state.connection_error} checked={this.state.update}
onChange={this.updateToggle} name="Update"/>
@@ -140,13 +143,13 @@ class PreviewImage extends Component<MyProps, MyState> {
<Switch disabled={this.state.connection_error} checked={this.state.settings.show_indexed}
onChange={this.showIndexedToggle} name="Show ROI"/>
Show only indexed images&nbsp;&nbsp;&nbsp;
<Box sx={{ width: 200 }}>
<Box sx={{width: 200}}>
<Slider disabled={this.state.connection_error}
value={Number(this.state.settings.saturation)} min={1} max={80}
onChange={this.setSaturation} valueLabelDisplay="auto"/> <br/>Saturation value
</Box>
&nbsp;&nbsp;&nbsp;
<Box sx={{ width: 200 }}>
<Box sx={{width: 200}}>
<Slider disabled={this.state.connection_error}
value={(this.state.settings.resolution_ring === undefined) ? 0.5 : Number(this.state.settings.resolution_ring)}
min={0.5} max={5.0} step={0.1}
@@ -155,8 +158,7 @@ class PreviewImage extends Component<MyProps, MyState> {
</Stack>
<br/>
{
this.state.s_url !== null ?
{(!this.state.connection_error && (this.state.s_url !== null)) ?
<Stack
direction="row"
justifyContent="center"
@@ -164,20 +166,13 @@ class PreviewImage extends Component<MyProps, MyState> {
>
<TransformWrapper>
<TransformComponent>
<img src={this.state.s_url} alt="Live preview" style={{maxWidth: "100%", maxHeight: 900}}/>
<img src={this.state.s_url} alt="Live preview"
style={{maxWidth: "100%", maxHeight: 900}}/>
</TransformComponent>
</TransformWrapper>
</Stack>: <div/>
</Stack> : <div>Preview not available</div>
}
<br/>
</div>
}
render() {
return <Paper sx={{height: 1050, width: 850, m: 2}}
component={Stack}
direction="column">
{(!this.state.connection_error && (this.state.s_url !== null)) ? this.preview() : "Preview not available"}
</Paper>
}
}

View File

@@ -31,6 +31,7 @@ export type measurement_statistics = {
detector_height?: number;
detector_pixel_depth?: measurement_statistics.detector_pixel_depth;
bkg_estimate?: number;
unit_cell?: string;
};
export namespace measurement_statistics {

View File

@@ -22,7 +22,7 @@ IF (CMAKE_CUDA_COMPILER)
FetchContent_Declare(
fast-indexer
GIT_REPOSITORY https://github.com/fleon-psi/fast-feedback-indexer/
GIT_TAG 651cbc9
GIT_TAG 66c3f44
)
FetchContent_MakeAvailable(fast-indexer)

View File

@@ -27,8 +27,12 @@ MXAnalyzer::MXAnalyzer(const DiffractionExperiment &in_experiment)
: experiment(in_experiment) {
auto uc = experiment.GetUnitCell();
if (uc) {
do_indexing = true;
indexer.Setup(uc.value());
try {
indexer = std::make_unique<IndexerWrapper>();
indexer->Setup(uc.value());
} catch (const std::exception &e) {
throw JFJochException(JFJochExceptionCategory::GPUCUDAError, e.what());
}
}
if (experiment.IsSpotFindingEnabled())
find_spots = true;
@@ -73,13 +77,13 @@ void MXAnalyzer::Process(DataMessage &message, const SpotFindingSettings& settin
for (const auto &spot: spots_out)
message.spots.push_back(spot);
if (do_indexing && settings.indexing) {
if (indexer && settings.indexing) {
std::vector<Coord> recip;
recip.reserve(spots_out.size());
for (const auto &i: spots_out)
recip.push_back(i.ReciprocalCoord(experiment));
auto indexer_result = indexer.Run(recip, settings.indexing_tolerance);
auto indexer_result = indexer->Run(recip, settings.indexing_tolerance);
if (!indexer_result.empty()) {
message.indexing_result = true;

View File

@@ -9,8 +9,7 @@
class MXAnalyzer {
const DiffractionExperiment &experiment;
IndexerWrapper indexer;
bool do_indexing = false;
std::unique_ptr<IndexerWrapper> indexer;
bool find_spots = false;
std::vector<DiffractionSpot> spots;
constexpr static const float spot_distance_threshold_pxl = 2.0f;

View File

@@ -1,4 +1,3 @@
FIND_PACKAGE(TIFF COMPONENTS CXX REQUIRED)
FIND_PACKAGE(JPEG REQUIRED)
ADD_LIBRARY(JFJochPreview STATIC
@@ -9,13 +8,7 @@ ADD_LIBRARY(JFJochPreview STATIC
TARGET_LINK_LIBRARIES(JFJochPreview PUBLIC JFJochCommon)
IF((EXISTS ${TIFF_INCLUDE_DIR}/tiffio.hxx) AND (EXISTS ${TIFF_INCLUDE_DIR}/tiffio.h))
TARGET_INCLUDE_DIRECTORIES(JFJochPreview PRIVATE ${TIFF_INCLUDE_DIR})
TARGET_LINK_LIBRARIES(JFJochPreview PUBLIC ${TIFF_LIBRARIES})
MESSAGE(STATUS "TIFF headers present and library included")
ELSE()
MESSAGE(FATAL_ERROR "TIFF headers tiffio.h and tiffio.hxx not present")
ENDIF()
TARGET_LINK_LIBRARIES(JFJochPreview PUBLIC tiff tiffxx)
IF (EXISTS ${JPEG_INCLUDE_DIR}/jpeglib.h)
TARGET_INCLUDE_DIRECTORIES(JFJochPreview PRIVATE ${JPEG_INCLUDE_DIR})

View File

@@ -33,6 +33,7 @@ constexpr const static rgb gray = {.r = 0xbe, .g = 0xbe, .b = 0xbe};
PreviewImage::PreviewImage(const DiffractionExperiment &in_experiment) :
experiment(in_experiment),
initialized(false),
xpixel(experiment.GetXPixelsNum()),
ypixel(experiment.GetYPixelsNum()),
beam_x(experiment.GetBeamX_pxl()),
@@ -47,6 +48,7 @@ void PreviewImage::UpdateImage(const void *in_uncompressed_image,
const std::vector<SpotToSave> &in_spots) {
if (counter.GeneratePreview()) {
std::unique_lock<std::mutex> ul(m);
initialized = true;
memcpy(uncompressed_image.data(), in_uncompressed_image, xpixel * ypixel * pixel_depth_bytes);
spots = in_spots;
}
@@ -109,6 +111,10 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons
{
// JPEG compression is outside the critical loop protected by m
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
if (!pixel_is_signed) {
if (pixel_depth_bytes == 2)
v = GenerateRGB<uint16_t>((uint16_t *) uncompressed_image.data(), xpixel * ypixel,
@@ -141,6 +147,9 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons
std::string PreviewImage::GenerateTIFF() const {
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
std::string s = WriteTIFFToString(const_cast<uint8_t *>(uncompressed_image.data()),
xpixel, ypixel, pixel_depth_bytes, pixel_is_signed);
return s;
@@ -167,6 +176,8 @@ std::vector<uint16_t> GenerateDioptasPreview(const void* input, size_t xpixel, s
std::string PreviewImage::GenerateTIFFDioptas() const {
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
std::vector<uint16_t> vec;
if (pixel_is_signed) {

View File

@@ -29,6 +29,7 @@ class PreviewImage {
mutable std::mutex m;
DiffractionExperiment experiment;
bool initialized;
const ROIMap roi_map;
std::vector<uint8_t> uncompressed_image;
std::vector<SpotToSave> spots;

View File

@@ -15,4 +15,4 @@ TARGET_LINK_LIBRARIES(JFJochReceiver ImagePusher JFJochImageAnalysis JFJochAcqui
ADD_EXECUTABLE(jfjoch_action_test jfjoch_action_test.cpp)
TARGET_LINK_LIBRARIES(jfjoch_action_test JFJochReceiver)
INSTALL(TARGETS jfjoch_action_test RUNTIME)
INSTALL(TARGETS jfjoch_action_test RUNTIME COMPONENT jfjoch)

View File

@@ -67,8 +67,9 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
images_to_go.Put(i);
// Setup frames summation and forwarding
for (int i = 0; i < frame_transformation_nthreads; i++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this);
for (uint32_t i = 0; i < frame_transformation_nthreads; i++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread,
this, i);
frame_transformation_futures.emplace_back(std::move(handle));
}
@@ -255,20 +256,22 @@ void JFJochReceiver::RetrievePedestal() {
}
}
void JFJochReceiver::FrameTransformationThread() {
void JFJochReceiver::FrameTransformationThread(uint32_t threadid) {
std::unique_ptr<MXAnalyzer> analyzer;
try {
numa_policy.Bind();
numa_policy.Bind(threadid);
analyzer = std::make_unique<MXAnalyzer>(experiment);
} catch (const JFJochException &e) {
frame_transformation_ready.count_down();
logger.Error("HW bind error {}", e.what());
logger.Error("Thread setup error {}", e.what());
Cancel(e);
return;
}
FrameTransformation transformation(experiment);
MXAnalyzer analyzer(experiment);
frame_transformation_ready.count_down();
uint16_t az_int_min_bin = std::floor(az_int_mapping.QToBin(experiment.GetLowQForBkgEstimate_recipA()));
@@ -313,7 +316,7 @@ void JFJochReceiver::FrameTransformationThread() {
adu_histogram_module[module_abs_number].Add(*output);
az_int_profile_image.Add(*output);
analyzer.ReadFromFPGA(output, local_spot_finding_settings, module_abs_number);
analyzer->ReadFromFPGA(output, local_spot_finding_settings, module_abs_number);
transformation.ProcessModule(output, d);
} else
@@ -332,7 +335,7 @@ void JFJochReceiver::FrameTransformationThread() {
continue;
}
analyzer.Process(message, local_spot_finding_settings);
analyzer->Process(message, local_spot_finding_settings);
message.receiver_free_send_buf = send_buf_ctrl.GetAvailBufLocations();
message.az_int_profile = az_int_profile_image.GetResult();
@@ -369,12 +372,12 @@ void JFJochReceiver::FrameTransformationThread() {
size_t image_size = transformation.CompressImage(writer_buffer + serializer.GetImageAppendOffset());
serializer.AppendImage(image_size);
compressed_size += image_size;
image_pusher.SendImage(writer_buffer, serializer.GetBufferSize(), image_number, loc);
image_pusher.SendImage(writer_buffer, serializer.GetBufferSize(), message.number, loc);
images_sent++; // Handle case when image not sent properly
UpdateMaxImage(image_number);
UpdateMaxImage(message.number);
logger.Debug("Frame transformation thread - done sending image {}", image_number);
logger.Debug("Frame transformation thread - done sending image {} / {}", image_number, message.number);
} catch (const JFJochException &e) {
logger.ErrorException(e);
Cancel(e);
@@ -552,10 +555,10 @@ JFJochReceiverStatus JFJochReceiver::GetStatus() const {
if ((experiment.GetImageNum() > 0) && (compressed_size > 0)) {
ret.compressed_ratio = static_cast<double> ((images_sent + images_skipped)
* experiment.GetPixelDepth()
* experiment.GetModulesNum()
* RAW_MODULE_SIZE)
/ static_cast<double> (compressed_size);
* experiment.GetPixelDepth()
* experiment.GetModulesNum()
* RAW_MODULE_SIZE)
/ static_cast<double> (compressed_size);
}
ret.progress = GetProgress();

View File

@@ -114,7 +114,7 @@ class JFJochReceiver {
LossyFilter serialmx_filter;
void AcquireThread(uint16_t data_stream);
void FrameTransformationThread();
void FrameTransformationThread(uint32_t threadid);
void Cancel(const JFJochException &e);
void FinalizeMeasurement();
void RetrievePedestal();

View File

@@ -2,7 +2,7 @@
#include "JFJochReceiverTest.h"
#include "JFJochReceiverService.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../frame_serialize/TestImagePusher.h"
#define STORAGE_CELL_FOR_TEST 11

View File

@@ -64,7 +64,8 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") {
.total_flux = 123,
.attenuator_transmission = 0.345,
.write_master_file = true,
.user_data = "Some random string 12345"
.user_data = "Some random string 12345",
.data_reduction_factor_serialmx = 0.75
};
REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message));
@@ -141,6 +142,7 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") {
CHECK(output_message.countrate_correction_enabled == message.countrate_correction_enabled);
CHECK(output_message.flatfield_enabled == message.flatfield_enabled);
CHECK(output_message.write_master_file == message.write_master_file);
CHECK(output_message.data_reduction_factor_serialmx == message.data_reduction_factor_serialmx);
}
TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") {

View File

@@ -221,8 +221,6 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") {
TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") {
{
ZMQContext c;
RegisterHDF5Filter();
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36));
std::vector<SpotToSave> spots;
@@ -233,10 +231,10 @@ TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") {
x.FillMessage(start_message);
HDF5Writer file_set(start_message);
file_set.SetupSocket(c, "ipc://#1");
file_set.SetupSocket("ipc://#1");
std::vector<uint16_t> image(x.GetPixelsNum());
ZMQSocket s(c, ZMQSocketType::Sub);
ZMQSocket s(ZMQSocketType::Sub);
s.Connect("ipc://#1");
s.SubscribeAll();
s.ReceiveTimeout(std::chrono::seconds(5));

View File

@@ -52,13 +52,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]
aq_devices.Add(std::move(test));
ZMQContext context;
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
context.NumThreads(4);
JFJochReceiverService service(aq_devices, logger, pusher);
service.NumThreads(nthreads);
@@ -127,11 +124,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_min_pix_2", "[JFJoc
test->SetInternalGeneratorFrame((uint16_t *) image_raw_geom.data() + m * RAW_MODULE_SIZE, m);
aq_devices.Add(std::move(test));
ZMQContext context;
context.NumThreads(4);
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
JFJochReceiverService service(aq_devices, logger, pusher);
@@ -241,11 +235,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") {
aq_devices.Add(std::move(test));
ZMQContext context;
context.NumThreads(4);
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
JFJochReceiverService service(aq_devices, logger, pusher);
@@ -269,6 +260,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") {
REQUIRE(plot.size() == 2);
CHECK(plot[0].title == "roi0");
REQUIRE(!plot[0].x.empty());
CHECK(plot[0].x[0] == 0);
CHECK(plot[0].y[0] == roi_value);
CHECK(plot[1].title == "roi1");

View File

@@ -58,7 +58,6 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") {
{.x = 1200, .y = 500, .indexed = true}
};
PreviewImage image(experiment);
image.UpdateImage(image_conv_2.data(), spots);
PreviewJPEGSettings preview_settings{
.saturation_value = 5,
@@ -66,6 +65,11 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") {
.show_spots = true
};
REQUIRE(image.GenerateJPEG(preview_settings).empty());
image.UpdateImage(image_conv_2.data(), spots);
std::string s;
REQUIRE_NOTHROW(s = image.GenerateJPEG(preview_settings));
std::ofstream f("lyso_diff.jpeg", std::ios::binary);

View File

@@ -4,15 +4,13 @@
#include <filesystem>
#include "../writer/StreamWriter.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../receiver/JFJochReceiverService.h"
TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
RegisterHDF5Filter();
Logger logger("test");
ZMQContext context;
std::string zmq_addr = "ipc://*";
DiffractionExperiment x(DetectorGeometry(2));
x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5)
@@ -23,7 +21,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
for (int i = 0; i < x.GetDataStreamsNum(); i++)
aq_devices.AddHLSDevice(64);
ZMQStream2PusherGroup pusher (context, {zmq_addr});
ZMQStream2Pusher pusher ({"ipc://*"});
JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher);
JFJochReceiverOutput receiver_output;
@@ -32,7 +30,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
REQUIRE(x.GetImageNum() == 5);
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == 1);
REQUIRE_NOTHROW(writer = std::make_unique<StreamWriter>(context, logger, pusher_addr[0]));
REQUIRE_NOTHROW(writer = std::make_unique<StreamWriter>(logger, pusher_addr[0]));
CHECK (writer->GetStatistics().state == StreamWriterState::Idle);
REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr));

View File

@@ -3,7 +3,7 @@
#include <random>
#include <catch2/catch_all.hpp>
#include "../writer/ZMQImagePuller.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
void test_puller(ZMQImagePuller *puller,
const DiffractionExperiment& x,
@@ -53,7 +53,6 @@ void test_puller(ZMQImagePuller *puller,
TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -71,12 +70,10 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
std::string zmq_addr = "ipc://*";
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
ZMQImagePuller puller(context);
ZMQStream2PusherGroup pusher(context, {zmq_addr});
ZMQImagePuller puller;
ZMQStream2Pusher pusher({"ipc://*"});
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
@@ -119,11 +116,87 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
REQUIRE(diff_content[0] == 0);
}
TEST_CASE("ZMQImageCommTest_1Writer_Preview","[ZeroMQ]") {
const size_t nframes = 1;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<float> empty_rad_int_profile;
REQUIRE(x.GetImageNum() == nframes);
std::mt19937 g1(1387);
std::uniform_int_distribution<uint16_t> dist;
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
// Puller needs to be declared first, but both objects need to exist till communication finished
ZMQImagePuller puller;
ZMQStream2Pusher pusher({"ipc://*"});
REQUIRE(pusher.GetPreviewAddress().empty());
pusher.PreviewSocket("ipc://*");
REQUIRE(!pusher.GetPreviewAddress().empty());
ZMQSocket preview_sub_socket(ZMQSocketType::Sub);
preview_sub_socket.Connect(pusher.GetPreviewAddress());
preview_sub_socket.Subscribe("");
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
auto pusher_addr = pusher.GetAddress();
puller.Connect(pusher_addr[0]);
std::thread sender_thread = std::thread([&] {
std::vector<uint8_t> serialization_buffer(16*1024*1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
StartMessage message {
.images_per_file = 16,
.write_master_file = true
};
EndMessage end_message{};
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
serializer.SerializeImage(data_message);
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
}
pusher.EndDataCollection(end_message);
});
std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content),
std::ref(nimages));
sender_thread.join();
puller_thread.join();
puller.Disconnect();
REQUIRE(nimages[0] == nframes);
REQUIRE(diff_size[0] == 0);
REQUIRE(diff_content[0] == 0);
ZMQMessage msg;
REQUIRE(preview_sub_socket.Receive(msg));
REQUIRE(preview_sub_socket.Receive(msg));
REQUIRE(preview_sub_socket.Receive(msg));
}
TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -148,7 +221,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
@@ -156,7 +229,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == 2);
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller.push_back(std::make_unique<ZMQImagePuller>());
puller[i]->Connect(pusher_addr[i]);
}
@@ -213,7 +286,6 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
const size_t nframes = 255;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -238,14 +310,14 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == npullers);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller.push_back(std::make_unique<ZMQImagePuller>());
puller[i]->Connect(pusher_addr[i]);
}
@@ -310,8 +382,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") {
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
ZMQContext context;
ZMQImagePuller puller(context);
ZMQImagePuller puller;
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
std::vector<uint16_t> image1(x.GetPixelsNum());
@@ -325,7 +396,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") {
}
TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") {
ZMQStream2PusherGroup pusher({"ipc://*"});
ZMQStream2Pusher pusher({"ipc://*"});
StartMessage msg;
REQUIRE_THROWS(pusher.StartDataCollection(msg));

View File

@@ -6,7 +6,7 @@
#include "../common/Logger.h"
#include "../receiver/FrameTransformation.h"
#include "../common/RawToConvertedGeometry.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#define BASE_TCP_PORT 8000
@@ -54,7 +54,7 @@ int main(int argc, char **argv) {
for (int i = 0; i < nsockets; i++)
zmq_addr.emplace_back("tcp://0.0.0.0:" + std::to_string(BASE_TCP_PORT + i));
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
FrameTransformation transformation(x);

View File

@@ -1,8 +1,6 @@
FIND_PACKAGE(HDF5 1.10 REQUIRED)
ADD_LIBRARY(JFJochHDF5Wrappers STATIC HDF5Objects.cpp HDF5Objects.h ../compression/bitshuffle/bshuf_h5filter.c)
TARGET_LINK_LIBRARIES(JFJochHDF5Wrappers Compression ${HDF5_LIBRARIES})
TARGET_INCLUDE_DIRECTORIES(JFJochHDF5Wrappers PUBLIC ${HDF5_INCLUDE_DIRS})
TARGET_LINK_LIBRARIES(JFJochHDF5Wrappers Compression hdf5-static)
ADD_LIBRARY(JFJochWriter STATIC
HDF5DataFile.h HDF5DataFile.cpp
@@ -23,7 +21,7 @@ ADD_LIBRARY(JFJochWriter STATIC
HDF5DataFilePluginJUNGFRAU.h
)
TARGET_LINK_LIBRARIES(JFJochWriter JFJochHDF5Wrappers JFJochCommon CBORStream2FrameSerialize)
TARGET_LINK_LIBRARIES(JFJochWriter JFJochZMQ JFJochLogger JFJochHDF5Wrappers CBORStream2FrameSerialize)
ADD_EXECUTABLE(HDF5Sum HDF5Sum.cpp)
TARGET_LINK_LIBRARIES(HDF5Sum JFJochWriter)
@@ -36,4 +34,4 @@ TARGET_INCLUDE_DIRECTORIES(WriterAPI PUBLIC gen/model gen/api)
ADD_EXECUTABLE(jfjoch_writer jfjoch_writer.cpp JFJochWriterHttp.h JFJochWriterHttp.cpp)
TARGET_LINK_LIBRARIES(jfjoch_writer JFJochWriter WriterAPI)
INSTALL(TARGETS jfjoch_writer RUNTIME)
INSTALL(TARGETS jfjoch_writer RUNTIME COMPONENT writer)

View File

@@ -1,6 +1,8 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <sys/stat.h>
#include <filesystem>
#include "HDF5DataFile.h"
#include "../compression/JFJochCompressor.h"
@@ -50,7 +52,9 @@ std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
data_set.reset();
}
data_file.reset();
std::rename(tmp_filename.c_str(), filename.c_str());
if (!std::filesystem::exists(filename.c_str()))
std::rename(tmp_filename.c_str(), filename.c_str());
closed = true;

View File

@@ -36,7 +36,8 @@ NXmx::NXmx(const StartMessage &start)
}
NXmx::~NXmx() {
std::rename(tmp_filename.c_str(), filename.c_str());
if (!std::filesystem::exists(filename.c_str()))
std::rename(tmp_filename.c_str(), filename.c_str());
}
std::string HDF5Metadata::DataFileName(const std::string &prefix, int64_t file_number) {
@@ -115,6 +116,9 @@ void NXmx::Detector(const StartMessage &start) {
"ns");
}
if (start.data_reduction_factor_serialmx)
det_specific.SaveScalar("data_reduction_factor_serialmx", start.data_reduction_factor_serialmx.value());
if (!start.gain_file_names.empty())
det_specific.SaveVector("gain_file_names", start.gain_file_names);

View File

@@ -74,8 +74,8 @@ void HDF5Writer::AddStats(const std::optional<HDF5DataFileStatistics>& s) {
}
}
void HDF5Writer::SetupSocket(ZMQContext &c, const std::string &addr) {
socket = std::make_unique<ZMQSocket>(c, ZMQSocketType::Pub);
void HDF5Writer::SetupSocket(const std::string &addr) {
socket = std::make_unique<ZMQSocket>(ZMQSocketType::Pub);
socket->Bind(addr);
}

View File

@@ -25,7 +25,7 @@ public:
explicit HDF5Writer(const StartMessage &request);
void Write(const DataMessage& msg);
std::vector<HDF5DataFileStatistics> Finalize();
void SetupSocket(ZMQContext &c, const std::string &addr);
void SetupSocket(const std::string &addr);
std::optional<std::string> GetZMQAddr();
};

View File

@@ -6,13 +6,11 @@
#include "HDF5NXmx.h"
#include "MakeDirectory.h"
StreamWriter::StreamWriter(ZMQContext &in_context,
Logger &in_logger,
StreamWriter::StreamWriter(Logger &in_logger,
const std::string &zmq_addr,
const std::string &repub_address,
const std::string &in_file_done_address)
: zmq_context(in_context),
image_puller(in_context, repub_address),
: image_puller(repub_address),
logger(in_logger),
file_done_address(in_file_done_address) {
image_puller.Connect(zmq_addr);
@@ -46,7 +44,7 @@ void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
HDF5Writer writer(*image_puller_output.cbor->start_message);
if (!file_done_address.empty())
writer.SetupSocket(zmq_context, file_done_address);
writer.SetupSocket(file_done_address);
std::unique_ptr<NXmx> master_file;
if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value())

View File

@@ -24,7 +24,6 @@ struct StreamWriterOutput {
};
class StreamWriter {
ZMQContext &zmq_context;
std::string file_done_address;
StreamWriterState state = StreamWriterState::Idle;
@@ -42,8 +41,7 @@ class StreamWriter {
void CollectImages(std::vector<HDF5DataFileStatistics> &v);
bool WaitForImage();
public:
StreamWriter(ZMQContext& context,
Logger &logger,
StreamWriter(Logger &logger,
const std::string& zmq_addr,
const std::string& repub_address = "",
const std::string& file_done_address = "");

View File

@@ -2,13 +2,13 @@
#include "ZMQImagePuller.h"
ZMQImagePuller::ZMQImagePuller(ZMQContext &context, const std::string &repub_address) :
socket (context, ZMQSocketType::Pull) {
ZMQImagePuller::ZMQImagePuller(const std::string &repub_address) :
socket (ZMQSocketType::Pull) {
socket.ReceiveWaterMark(ReceiverWaterMark);
socket.ReceiveTimeout(ReceiveTimeout);
if (!repub_address.empty()) {
repub_socket = std::make_unique<ZMQSocket>(context, ZMQSocketType::Push);
repub_socket = std::make_unique<ZMQSocket>(ZMQSocketType::Push);
repub_socket->SendWaterMark(100);
repub_socket->SendTimeout(std::chrono::milliseconds(100));
repub_socket->Bind(repub_address);

View File

@@ -42,7 +42,7 @@ class ZMQImagePuller {
void RepubThread();
Logger logger{"ZMQImagePuller"};
public:
explicit ZMQImagePuller(ZMQContext &context, const std::string &repub_address = "");
explicit ZMQImagePuller(const std::string &repub_address = "");
~ZMQImagePuller();
void Connect(const std::string &in_address);
void Disconnect();

View File

@@ -117,7 +117,7 @@ int main(int argc, char **argv) {
ZMQContext context;
Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port));
writer = new StreamWriter(context, logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr);
writer = new StreamWriter(logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr);
httpEndpoint = new Pistache::Http::Endpoint(addr);
auto router = std::make_shared<Pistache::Rest::Router>();