jfjoch_writer: Not dependent on gRPC

This commit is contained in:
2023-11-15 10:40:03 +01:00
parent 049dffe91e
commit e8d576a563
20 changed files with 110 additions and 609 deletions

View File

@@ -3,7 +3,7 @@ ADD_LIBRARY(JFJochBroker STATIC
JFJochServices.cpp JFJochServices.h
JFJochBroker.cpp JFJochBroker.h JFJochBrokerParser.cpp JFJochBrokerParser.h)
TARGET_LINK_LIBRARIES(JFJochBroker JFJochReceiver JFJochDetector gRPCClients CommonFunctions JFJochProtoBuf)
TARGET_LINK_LIBRARIES(JFJochBroker JFJochReceiver JFJochDetector CommonFunctions JFJochProtoBuf)
ADD_EXECUTABLE(jfjoch_broker jfjoch_broker.cpp)
TARGET_LINK_LIBRARIES(jfjoch_broker JFJochBroker)

View File

@@ -234,12 +234,6 @@ void ParseBrokerConfiguration(const nlohmann::json &input, const std::string& ta
if (CHECK_OBJECT(input, tag)) {
auto j = input[tag];
if (CHECK_ARRAY(j, "writer")) {
for (const auto &iter: j["writer"]) {
broker.Services().Writer(GET_STR(iter, "addr_grpc"), GET_STR(iter, "addr_zmq"));
}
}
if (j.contains("detector"))
broker.Services().Detector();
} else

View File

@@ -8,13 +8,6 @@ JFJochServices::JFJochServices(Logger &in_logger) : logger(in_logger) {}
void JFJochServices::Start(const DiffractionExperiment& experiment, const JFCalibration &calibration) {
logger.Info("Measurement start for: {}", experiment.GetFilePrefix());
if ((experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty())) {
logger.Info(" ... writer start");
writer.Start(writer_zmq_addr, 0);
writer_running = true;
} else
writer_running = false;
if (receiver != nullptr) {
logger.Info(" ... receiver start");
if (experiment.GetDetectorMode() == DetectorMode::Conversion)
@@ -72,20 +65,6 @@ JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) {
logger.Info("Receiver finished with success");
}
if (writer_running) {
logger.Info("Stopping writer");
try {
auto stats = writer.Stop();
logger.Info(" ... finished with success");
for (int i = 0; i < stats.size(); i++)
logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
i, stats[i].nimages(), stats[i].performance_mbs(), stats[i].performance_hz());
} catch (JFJochException &e) {
logger.Error(" ... finished with error {}",e.what());
exception = std::make_unique<JFJochException>(e);
}
}
if (detector) {
logger.Info("Stopping detector");
try {
@@ -106,7 +85,7 @@ JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) {
void JFJochServices::Abort() {
// Abort should try to achieve the best outcome possible
// but it OK if things fail (for example lost connection)
// but it is OK if things fail (for example lost connection)
try {
if (receiver != nullptr)
receiver->Abort();
@@ -127,13 +106,6 @@ JFJochServices &JFJochServices::Receiver(JFJochReceiverService *input) {
return *this;
}
JFJochServices &JFJochServices::Writer(const std::string &addr, const std::string &zmq_push_addr) {
writer.AddClient(addr);
writer_zmq_addr.push_back(zmq_push_addr);
logger.Info("Using writer service with gRPC {} listening for images from ZeroMQ {}", addr, zmq_push_addr);
return *this;
}
JFJochServices &JFJochServices::Detector() {
detector = std::make_unique<DetectorWrapper>();
logger.Info("Using detector service");
@@ -177,7 +149,3 @@ void JFJochServices::Trigger() {
if (detector)
detector->Trigger();
}
size_t JFJochServices::WriterZMQCount() const {
return writer_zmq_addr.size();
}

View File

@@ -7,7 +7,6 @@
#include "../jungfrau/JFCalibration.h"
#include "../common/Logger.h"
#include "../receiver/JFJochReceiverService.h"
#include "../grpc/JFJochWriterGroupClient.h"
#include "../detector_control/DetectorWrapper.h"
struct JFJochServicesOutput {
@@ -16,12 +15,9 @@ struct JFJochServicesOutput {
class JFJochServices {
JFJochReceiverService *receiver = nullptr;
JFJochWriterGroupClient writer;
std::unique_ptr<DetectorWrapper> detector;
Logger &logger;
bool writer_running = false;
std::vector<std::string> writer_zmq_addr;
public:
explicit JFJochServices(Logger &in_logger);
void On(const DiffractionExperiment& experiment);
@@ -38,10 +34,7 @@ public:
void SetDataProcessingSettings(const DataProcessingSettings &settings);
JFJochServices& Receiver(JFJochReceiverService *input);
JFJochServices& Writer(const std::string &addr, const std::string &zmq_push_addr);
JFJochServices& Detector();
size_t WriterZMQCount() const;
};

View File

@@ -8,6 +8,8 @@
#include <future>
#include <optional>
#include <jfjoch.pb.h>
#include "../common/DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
#include "../common/Logger.h"

View File

@@ -45,9 +45,3 @@ ADD_DEPENDENCIES(JFJochProtoBuf jfjoch-grpc-python)
TARGET_INCLUDE_DIRECTORIES(JFJochProtoBuf PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
TARGET_LINK_LIBRARIES(JFJochProtoBuf ${_GRPC_GRPCPP})
ADD_LIBRARY(gRPCClients STATIC
JFJochWriterClient.cpp JFJochWriterClient.h
JFJochWriterGroupClient.cpp JFJochWriterGroupClient.h)
TARGET_LINK_LIBRARIES(gRPCClients CommonFunctions JFJochProtoBuf)

View File

@@ -66,8 +66,6 @@ message Plot {
repeated float y = 2;
}
// DiffractionExperiment
message DatasetSettings {
int64 images_per_trigger = 1;
int64 ntrigger = 2;
@@ -111,8 +109,6 @@ message DetectorSettings {
int64 storage_cell_delay_ns = 10;
}
// Calibration
message ModuleStatistics {
int64 module_number = 1;
int64 storage_cell_number = 2;
@@ -132,7 +128,6 @@ message JFCalibrationStatistics {
repeated ModuleStatistics module_statistics = 1;
}
// Receiver
enum PlotType {
BKG_ESTIMATE = 0;
RAD_INT = 1;
@@ -156,25 +151,6 @@ message RadialIntegrationProfiles {
repeated RadialIntegrationProfile profiles = 1;
}
// Writer
message WriterInput {
string zmq_receiver_address = 1;
int64 series_id = 2;
}
message DataFileStatistics {
string name = 1;
int64 nimages = 2;
}
message WriterOutput {
int64 nimages = 1;
float performance_MBs = 2;
float performance_Hz = 3;
repeated DataFileStatistics file_statistics = 4;
}
message DataProcessingSettings {
float signal_to_noise_threshold = 1; // STRONG_PIXEL in XDS
int64 photon_count_threshold = 2; // Threshold in photon counts
@@ -188,8 +164,6 @@ message DataProcessingSettings {
bool preview_indexed_only = 10;
}
// Broker
message Image {
bytes data = 1;
int64 width = 2;
@@ -244,12 +218,6 @@ message DetectorSelection {
int64 id = 1;
}
service gRPC_JFJochWriter {
rpc Start (WriterInput) returns (Empty) {}
rpc Abort (Empty) returns (Empty) {}
rpc Stop (Empty) returns (WriterOutput) {}
}
service gRPC_JFJochBroker {
rpc Start (DatasetSettings) returns (Empty) {}
rpc Stop (Empty) returns (Empty) {}

File diff suppressed because one or more lines are too long

View File

@@ -5,133 +5,6 @@ import grpc
import jfjoch_pb2 as jfjoch__pb2
class gRPC_JFJochWriterStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Start = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochWriter/Start',
request_serializer=jfjoch__pb2.WriterInput.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.Abort = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochWriter/Abort',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.Stop = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochWriter/Stop',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.WriterOutput.FromString,
)
class gRPC_JFJochWriterServicer(object):
"""Missing associated documentation comment in .proto file."""
def Start(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Abort(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Stop(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_gRPC_JFJochWriterServicer_to_server(servicer, server):
rpc_method_handlers = {
'Start': grpc.unary_unary_rpc_method_handler(
servicer.Start,
request_deserializer=jfjoch__pb2.WriterInput.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'Abort': grpc.unary_unary_rpc_method_handler(
servicer.Abort,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'Stop': grpc.unary_unary_rpc_method_handler(
servicer.Stop,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.WriterOutput.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'JFJochProtoBuf.gRPC_JFJochWriter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class gRPC_JFJochWriter(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Start(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Start',
jfjoch__pb2.WriterInput.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Abort(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Abort',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Stop(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochWriter/Stop',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.WriterOutput.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
class gRPC_JFJochBrokerStub(object):
"""Missing associated documentation comment in .proto file."""

View File

@@ -1,11 +1,11 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <catch2/catch.hpp>
#include <jfjoch.pb.h>
#include "../grpc/gRPCServer_Template.h"
#include "../broker/JFJochStateMachine.h"
#include "../writer/JFJochWriterService.h"
#include "../receiver/JFJochReceiverService.h"
#include "../writer/StreamWriter.h"
#include "FPGAUnitTest.h"
#include "../acquisition_device/MockAcquisitionDevice.h"
#include "../common/ZMQImagePusher.h"
@@ -40,15 +40,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
services.Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -95,10 +94,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
writer_server->Shutdown();
writer_future.get();
}
TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
Logger logger("JFJochIntegrationTest_ZMQ_save_calibration");
ZMQContext zmq_context;
@@ -127,15 +125,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
services.Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);;
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -183,7 +180,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") {
@@ -215,15 +212,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
services.Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -271,7 +267,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
@@ -292,7 +288,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().Mode(DetectorMode::Conversion);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -309,9 +304,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
JFJochProtoBuf::DetectorSettings detector_settings;
detector_settings.set_frame_time_us(500);
@@ -362,8 +356,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
REQUIRE(statistics.detector_height() == 8 * 512);
REQUIRE(statistics.detector_pixel_depth() == 2);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
@@ -383,10 +376,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services
.Writer("unix:writer_test_0", "inproc://#0")
.Writer("unix:writer_test_1", "inproc://#1")
.Writer("unix:writer_test_2", "inproc://#2");
logger.Verbose(true);
@@ -403,13 +392,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer_0(zmq_context, logger);
JFJochWriterService writer_1(zmq_context, logger);
JFJochWriterService writer_2(zmq_context, logger);
StreamWriter writer_0(zmq_context, logger, "inproc://#0");
auto writer_0_future = writer_0.RunFuture();
auto writer_server_0 = gRPCServer("unix:writer_test_0", writer_0);
auto writer_server_1 = gRPCServer("unix:writer_test_1", writer_1);
auto writer_server_2 = gRPCServer("unix:writer_test_2", writer_2);
StreamWriter writer_1(zmq_context, logger, "inproc://#1");
auto writer_1_future = writer_1.RunFuture();
StreamWriter writer_2(zmq_context, logger, "inproc://#2");
auto writer_2_future = writer_2.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -451,13 +441,11 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == nimages);
writer_server_0->Shutdown();
writer_server_1->Shutdown();
writer_server_2->Shutdown();
REQUIRE_NOTHROW(writer_0_future.get());
REQUIRE_NOTHROW(writer_1_future.get());
REQUIRE_NOTHROW(writer_2_future.get());
}
TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
Logger logger("JFJochIntegrationTest_Cancel");
@@ -475,7 +463,6 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -497,9 +484,8 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -529,8 +515,7 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(statistics.cancelled());
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
@@ -549,7 +534,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod(
5ms);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -573,7 +557,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
fpga_receiver.PreviewPublisher(&preview).NumThreads(1);
@@ -582,8 +567,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
rcv_preview_socket.SubscribeAll();
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -633,8 +616,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]") {
@@ -653,7 +635,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod(
5ms);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -676,8 +657,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
fpga_receiver.PreviewPublisher(&preview).NumThreads(1);
@@ -685,8 +664,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
rcv_preview_socket.SubscribeAll();
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -734,8 +711,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
writer_server->Shutdown();
}
TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
@@ -753,7 +728,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -796,9 +770,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -842,8 +815,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 1);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
/*
TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]") {
@@ -979,7 +952,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
state_machine.NotThreadSafe_Experiment().LowQForRadialInt_recipA(0.5).HighQForRadialInt_recipA(3.5)
.QSpacingForRadialInt_recipA(1.0);
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -1027,8 +999,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
StreamWriter writer(zmq_context, logger, "inproc://#1");
auto writer_future = writer.RunFuture();
REQUIRE_NOTHROW(state_machine.Initialize());
logger.Info("Initialized");
@@ -1070,8 +1042,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
CHECK(plot_map[0].title() == "dataset");
CHECK(plot_map[0].plot().x_size() == 3);
writer_server->Shutdown();
REQUIRE_NOTHROW(writer_future.get());
}
/*

View File

@@ -3,12 +3,9 @@
#include <catch2/catch.hpp>
#include <filesystem>
#include "../writer/JFJochWriterService.h"
#include "../writer/StreamWriter.h"
#include "../writer/HDF5Objects.h"
#include "../common/ZMQImagePusher.h"
#include "../receiver/JFJochReceiverService.h"
#include "../acquisition_device/HLSSimulatedDevice.h"
TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
RegisterHDF5Filter();
@@ -17,9 +14,6 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
ZMQContext context;
std::string zmq_addr = "inproc://#1";
grpc::ServerContext grpc_context;
JFJochProtoBuf::Empty empty;
DiffractionExperiment x(DetectorGeometry(2));
x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5)
.UseInternalPacketGenerator(true).Mode(DetectorMode::Raw).PedestalG0Frames(0);
@@ -62,47 +56,3 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
REQUIRE(std::filesystem::remove("subdir/JFJochWriterTest_data_000.h5"));
REQUIRE(std::filesystem::remove("subdir"));
}
TEST_CASE("JFJochWriterServiceTest_ZMQ","[JFJochWriter]") {
RegisterHDF5Filter();
Logger logger("test");
ZMQContext context;
std::string zmq_addr = "inproc://#1";
JFJochWriterService writer(context, logger);
grpc::ServerContext grpc_context;
JFJochProtoBuf::Empty empty;
DiffractionExperiment x(DetectorGeometry(2));
x.FilePrefix("JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5)
.UseInternalPacketGenerator(true)
.Mode(DetectorMode::Raw).PedestalG0Frames(0);
JFModuleGainCalibration empty_gain;
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < x.GetDataStreamsNum(); i++)
aq_devices.AddHLSDevice(64);
ZMQImagePusher pusher (context, {zmq_addr});
JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher);
JFJochProtoBuf::WriterInput writer_input;
writer_input.set_zmq_receiver_address(zmq_addr);
JFJochReceiverOutput receiver_output;
JFJochProtoBuf::WriterOutput writer_output;
REQUIRE(x.GetImageNum() == 5);
REQUIRE(writer.Start(&grpc_context, &writer_input, &empty).ok());
REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr));
REQUIRE_NOTHROW(receiver_output = fpga_receiver_service.Stop());
REQUIRE(writer.Stop(&grpc_context, &empty, &writer_output).ok());
REQUIRE(writer_output.nimages() == 5);
//TODO: Check contest of HDF5 file
}

View File

@@ -17,5 +17,8 @@ TARGET_LINK_LIBRARIES(PreviewTest JFJochWriter CommonFunctions)
ADD_EXECUTABLE(JFCalibrationPerfTest JFCalibrationPerfTest.cpp)
TARGET_LINK_LIBRARIES(JFCalibrationPerfTest CommonFunctions)
ADD_EXECUTABLE(jfjoch_writer_test jfjoch_writer_test.cpp)
TARGET_LINK_LIBRARIES(jfjoch_writer_test JFJochWriter CommonFunctions)
INSTALL(TARGETS jfjoch_udp_simulator CompressionBenchmark HDF5DatasetWriteTest DataAnalysisPerfTest
PreviewTest JFCalibrationPerfTest RUNTIME)
PreviewTest jfjoch_writer_test JFCalibrationPerfTest RUNTIME)

View File

@@ -2,27 +2,28 @@
#include <cstdlib>
#include <iostream>
#include "HDF5Objects.h"
#include "../writer/HDF5Objects.h"
#include "../common/Logger.h"
#include "../common/FrameTransformation.h"
#include "../common/RawToConvertedGeometry.h"
#include "../common/ZMQImagePusher.h"
#include "../grpc/JFJochWriterGroupClient.h"
#define BASE_TCP_PORT 8000
int main(int argc, char **argv) {
Logger logger("jfjoch_writer_test");
RegisterHDF5Filter();
if (argc < 4) {
std::cout << "Usage: ./jfjoch_writer_test <JF4M hdf5 file> <#images> <writer gRPC address(es)>" << std::endl;
if (argc != 4) {
std::cout << "Usage: ./jfjoch_writer_test <JF4M hdf5 file> <#images> <# of sockets>" << std::endl;
std::cout << std::endl;
exit(EXIT_FAILURE);
}
int64_t nimages_out = atoi(argv[2]);
int64_t nsockets = atoi(argv[3]);
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36));
x.Summation(1).ImagesPerTrigger(nimages_out).Mode(DetectorMode::Conversion);
@@ -49,14 +50,11 @@ int main(int argc, char **argv) {
ZMQContext context;
context.NumThreads(4);
JFJochWriterGroupClient client;
std::vector<std::string> zmq_addr;
for (int i = 3; i < argc; i++) {
for (int i = 0; i < nsockets; i++)
zmq_addr.emplace_back("tcp://0.0.0.0:" + std::to_string(BASE_TCP_PORT + i));
client.AddClient(argv[i]);
}
x.DataFileCount(zmq_addr.size());
x.DataFileCount(nsockets);
ZMQImagePusher pusher(context, zmq_addr);
@@ -80,14 +78,11 @@ int main(int argc, char **argv) {
for (int j = 0; j < x.GetModulesNum(); j++)
transformation.ProcessModule(image_tmp_raw.data() + j * RAW_MODULE_SIZE, j, 0);
transformation.Pack();
output_size[i] = transformation.SaveCompressedImage((uint8_t *) output[i].data());
}
logger.Info("Sending {} images", nimages_out);
client.Start(zmq_addr, 0);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<uint8_t> send_buffer(x.GetPixelsNum() * x.GetPixelDepth() * 2);
JFJochFrameSerializer serializer(send_buffer.data(), send_buffer.size());
@@ -95,7 +90,9 @@ int main(int argc, char **argv) {
StartMessage start_message;
x.FillMessage(start_message);
pusher.StartDataCollection(start_message);
serializer.SerializeSequenceStart(start_message);
pusher.StartDataCollection(send_buffer.data(), serializer.GetBufferSize(), x.GetDataFileCount());
for (int i = 0; i < nimages_out; i++) {
DataMessage data_message;
@@ -109,9 +106,4 @@ int main(int argc, char **argv) {
pusher.EndDataCollection(end_message);
logger.Info("Sending done");
auto stats = client.Stop();
logger.Info("Writing done");
for (int i = 0; i < stats.size(); i++)
logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
i, stats[i].nimages(), stats[i].performance_mbs(), stats[i].performance_hz());
}

View File

@@ -5,25 +5,19 @@ TARGET_LINK_LIBRARIES(HDF5Wrappers Compression ${HDF5_LIBRARIES})
TARGET_INCLUDE_DIRECTORIES(HDF5Wrappers PUBLIC ${HDF5_INCLUDE_DIRS})
ADD_EXECUTABLE(jfjoch_writer jfjoch_writer.cpp)
ADD_EXECUTABLE(jfjoch_writer_multi jfjoch_writer_multi.cpp)
ADD_LIBRARY(JFJochWriter STATIC
HDF5DataFile.h HDF5DataFile.cpp
HDF5NXmx.cpp HDF5NXmx.h
HDF5Writer.cpp HDF5Writer.h
ZMQImagePuller.cpp ZMQImagePuller.h
StreamWriter.cpp StreamWriter.h
JFJochWriterService.cpp JFJochWriterService.h)
StreamWriter.cpp StreamWriter.h)
TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions JFJochProtoBuf)
TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions JFJochProtoBuf)
TARGET_LINK_LIBRARIES(jfjoch_writer JFJochWriter)
TARGET_LINK_LIBRARIES(jfjoch_writer_multi JFJochWriter)
ADD_EXECUTABLE(jfjoch_writer_test jfjoch_writer_test.cpp)
TARGET_LINK_LIBRARIES(jfjoch_writer_test JFJochWriter gRPCClients CommonFunctions)
INSTALL(TARGETS jfjoch_writer jfjoch_writer_multi jfjoch_writer_test RUNTIME)
INSTALL(TARGETS jfjoch_writer RUNTIME)
ADD_EXECUTABLE(HDF5Sum HDF5Sum.cpp)
TARGET_LINK_LIBRARIES(HDF5Sum HDF5Wrappers)

View File

@@ -1,82 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "JFJochWriterService.h"
#include <filesystem>
JFJochWriterService::JFJochWriterService(ZMQContext &in_context, Logger &in_logger) :
logger(in_logger), zmq_context(in_context) {}
grpc::Status JFJochWriterService::Start(grpc::ServerContext *context, const JFJochProtoBuf::WriterInput *request,
JFJochProtoBuf::Empty *response) {
std::unique_lock<std::shared_mutex> ul(m);
try {
if (writer)
writer.reset();
logger.Info("Starting writer");
writer = std::make_unique<StreamWriter>(zmq_context, logger, request->zmq_receiver_address());
writer_future = std::async(std::launch::async, &StreamWriter::Run, writer.get());
logger.Info(" ... done.");
return grpc::Status::OK;
} catch (JFJochException &e) {
logger.ErrorException(e);
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochWriterService::Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::WriterOutput *response) {
try {
{
std::shared_lock<std::shared_mutex> ul(m);
logger.Info("Stopping writer");
if (writer_future.valid()) {
auto ret = writer_future.get();
response->set_nimages(ret.image_puller_stats.processed_images);
response->set_performance_mbs(ret.image_puller_stats.performance_MBs);
response->set_performance_hz(ret.image_puller_stats.performance_Hz);
for (const auto &f: ret.data_file_stats) {
auto *tmp = response->add_file_statistics();
tmp->set_name(f.filename);
tmp->set_nimages(f.max_image_number + 1);
}
} else {
response->set_nimages(0);
response->set_performance_mbs(0);
response->set_performance_hz(0);
}
}
{
std::unique_lock<std::shared_mutex> ul(m);
if (writer)
writer.reset();
}
logger.Info("Done");
return grpc::Status::OK;
} catch (JFJochException &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochWriterService::Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) {
std::shared_lock<std::shared_mutex> ul(m);
if (writer)
writer->Abort();
return grpc::Status::OK;
}
JFJochWriterService &JFJochWriterService::BaseDirectory(const std::string &input) {
logger.Info("Setting base directory to " + input);
try {
std::filesystem::current_path(input);
} catch (const std::filesystem::filesystem_error& err) {
logger.Error("Cannot set base directory: " + std::string(err.what()));
throw JFJochException(JFJochExceptionCategory::FileWriteError,
"Cannot set base directory " + std::string(err.what()));
}
return *this;
}

View File

@@ -1,28 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H
#define JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H
#include <shared_mutex>
#include <jfjoch.grpc.pb.h>
#include "StreamWriter.h"
class JFJochWriterService final : public JFJochProtoBuf::gRPC_JFJochWriter::Service {
std::shared_mutex m;
std::unique_ptr<StreamWriter> writer;
Logger &logger;
ZMQContext &zmq_context;
std::future<StreamWriterStatistics> writer_future;
public:
JFJochWriterService(ZMQContext &in_context, Logger &in_logger);
JFJochWriterService &BaseDirectory(const std::string& input);
grpc::Status Start(grpc::ServerContext *context, const JFJochProtoBuf::WriterInput *request,
JFJochProtoBuf::Empty *response) override;
grpc::Status Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::WriterOutput *response) override;
grpc::Status Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) override;
};
#endif //JUNGFRAUJOCH_JFJOCHWRITERSERVICE_H

View File

@@ -51,8 +51,9 @@ void StreamWriter::Abort() {
image_puller.Abort();
}
StreamWriterStatistics StreamWriter::Run() {
StreamWriterStatistics StreamWriter::Run() {
StreamWriterStatistics ret;
start_message = StartMessage();
StartDataCollection();
try {
CollectImages(ret.data_file_stats);
@@ -68,3 +69,7 @@ StreamWriterStatistics StreamWriter::Run() {
ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz);
return ret;
}
std::future<StreamWriterStatistics> StreamWriter::RunFuture() {
return std::async(std::launch::async, &StreamWriter::Run, this);
}

View File

@@ -3,6 +3,8 @@
#ifndef JUNGFRAUJOCH_STREAMWRITER_H
#define JUNGFRAUJOCH_STREAMWRITER_H
#include <future>
#include "ZMQImagePuller.h"
#include "HDF5DataFile.h"
@@ -21,7 +23,8 @@ class StreamWriter {
void EndDataCollection();
public:
StreamWriter(ZMQContext& context, Logger &logger, const std::string& zmq_addr);
StreamWriterStatistics Run();
StreamWriterStatistics Run();
std::future<StreamWriterStatistics> RunFuture();
void Abort();
};

View File

@@ -1,41 +1,23 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <fstream>
#include <nlohmann/json.hpp>
#include "HDF5Objects.h"
#include "../grpc/gRPCServer_Template.h"
#include "../common/Logger.h"
#include "JFJochWriterService.h"
#include "StreamWriter.h"
volatile bool quitok = false;
int main(int argc, char **argv) {
RegisterHDF5Filter();
Logger logger("jfjoch_writer");
if (argc < 3) {
logger.Error("Usage ./jfjoch_writer <json config file> <gRPC port>");
exit(EXIT_FAILURE);
}
std::string grpc_addr = "0.0.0.0:" + std::string(argv[2]);
nlohmann::json input;
std::ifstream file(argv[1]);
try {
input = nlohmann::json::parse(file);
} catch (const nlohmann::json::exception &e) {
logger.Error("JSON Parsing exception: " + std::string(e.what()));
if (argc < 2) {
logger.Error("Usage ./jfjoch_writer <target ZMQ addr>");
exit(EXIT_FAILURE);
}
ZMQContext context;
JFJochWriterService service(context, logger);
StreamWriter writer(context, logger, argv[1]);
if (input.contains("base_directory"))
service.BaseDirectory(input["base_directory"]);
auto server = gRPCServer(grpc_addr, service);
logger.Info("gRPC configuration listening on address " + grpc_addr);
server->Wait();
while (!quitok)
writer.Run();
}

View File

@@ -1,74 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <sys/wait.h>
#include <fstream>
#include <nlohmann/json.hpp>
#include "../grpc/gRPCServer_Template.h"
#include "../common/Logger.h"
#include "JFJochWriterService.h"
#include "HDF5Objects.h"
int main(int argc, char **argv) {
RegisterHDF5Filter();
Logger logger("jfjoch_writer");
if (argc == 1)
logger.Error("Usage ./jfjoch_writer_multi <json config file>");
std::vector<std::string> grpc_addr;
std::string base_directory;
{
nlohmann::json input;
std::ifstream file(argv[1]);
try {
input = nlohmann::json::parse(file);
} catch (const nlohmann::json::exception &e) {
logger.Error("JSON Parsing exception: " + std::string(e.what()));
exit(EXIT_FAILURE);
}
if (input.contains("base_directory"))
base_directory = input["base_directory"];
if (!input.contains("grpc_addr"))
logger.Error("Input file needs writer services gRPC addresses");
if (!input["grpc_addr"].is_array())
logger.Error("grpc_addr must be array");
for (const auto &j: input["grpc_addr"]) {
if (j.is_number()) {
int64_t tcp_port = j.get<int64_t>();
if ((tcp_port <= 0) || (tcp_port >= UINT16_MAX))
logger.Error("tcp port {} invalid", tcp_port);
grpc_addr.push_back("0.0.0.0:" + std::to_string(tcp_port));
} else if (j.is_string()) {
grpc_addr.push_back(j.get<std::string>());
} else
logger.Error("grpc_addr array element must be string");
}
}
std::vector<pid_t> child_pids;
for (const auto &addr: grpc_addr) {
Logger logger_local(addr);
pid_t pid = fork();
if (pid == 0) {
ZMQContext context;
JFJochWriterService service(context, logger_local);
if (!base_directory.empty())
service.BaseDirectory(base_directory);
auto server = gRPCServer(addr, service);
logger_local.Info("gRPC configuration listening on address " + addr);
server->Wait();
} else
child_pids.push_back(pid);
}
int status;
for (const auto &p: child_pids)
wait(&status);
}