From 0cf8dc34a70c8cf9ccc2a8aa0232eac88df8ce8e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 12 Feb 2018 17:46:46 +0100 Subject: [PATCH] Add SF format --- src/format/SfFormat.cpp | 100 ++++++++++++++++++ ...h5_zmq_writer.cpp => mx_h5_zmq_writer.cpp} | 6 +- src/sf_h5_zmq_writer.cpp | 62 +++++++++++ 3 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 src/format/SfFormat.cpp rename src/{h5_zmq_writer.cpp => mx_h5_zmq_writer.cpp} (84%) create mode 100644 src/sf_h5_zmq_writer.cpp diff --git a/src/format/SfFormat.cpp b/src/format/SfFormat.cpp new file mode 100644 index 0000000..b496af0 --- /dev/null +++ b/src/format/SfFormat.cpp @@ -0,0 +1,100 @@ +#include +#include +#include +#include +#include + +#include "../config.hpp" +#include "../H5Format.hpp" + +using namespace std; +using s_ptr = shared_ptr; + +class SfFormat : public H5Format +{ + shared_ptr> input_value_type = NULL; + shared_ptr> default_values = NULL; + shared_ptr> dataset_move_mapping = NULL; + shared_ptr file_format = NULL; + + public: + ~SfFormat(){}; + + SfFormat() + { + // Input values definition type. + // Which type should be the parameters you receive over the REST api. + input_value_type.reset( + new unordered_map({ + {"file_info/date", NX_CHAR}, + {"file_info/version", NX_CHAR}, + {"file_info/owner", NX_CHAR}, + {"file_info/instrument", NX_CHAR}, + {"experiment_info/Pgroup", NX_CHAR}, + })); + + // Default values used in the file format. + default_values.reset(new std::unordered_map({})); + + // After format has been writen, where to move the raw datasets. + dataset_move_mapping.reset(new std::unordered_map( + { + {config::raw_image_dataset_name, "data/image"}, + {"pulse_id", "data/pulse_id"}, + })); + + // Definition of the file format. + file_format.reset( + new h5_parent("", EMPTY_ROOT, { + s_ptr(new h5_group("file_info", { + s_ptr(new h5_dataset("Date", "file_info/date", NX_DATE_TIME)), + s_ptr(new h5_dataset("Version", "file_info/version", NX_CHAR)), + s_ptr(new h5_dataset("Owner", "file_info/owner", NX_CHAR)), + s_ptr(new h5_dataset("Instrument", "file_info/instrument", NX_CHAR)), + })), + + s_ptr(new h5_group("experiment_info", { + s_ptr(new h5_dataset("Pgroup", "experiment_info/Pgroup", NX_CHAR)), + })), + + s_ptr(new h5_group("data")), + })); + } + + const h5_parent& get_format_definition() const override + { + return *file_format; + } + + const unordered_map& get_default_values() const override + { + return *default_values; + } + + void add_calculated_values(unordered_map& values) const override + { + // No calculated values. + } + + void add_input_values(unordered_map& values, + const unordered_map& input_values) const override + { + // Input value mapping is 1:1. + for (const auto& input_value : input_values) { + const auto& name = input_value.first; + const auto& value = input_value.second; + + values[name] = value; + } + } + + const std::unordered_map& get_input_value_type() const override + { + return *input_value_type; + } + + const unordered_map& get_dataset_move_mapping() const override { + return *dataset_move_mapping; + } + +}; \ No newline at end of file diff --git a/src/h5_zmq_writer.cpp b/src/mx_h5_zmq_writer.cpp similarity index 84% rename from src/h5_zmq_writer.cpp rename to src/mx_h5_zmq_writer.cpp index f6f3a01..a5b6d3b 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/mx_h5_zmq_writer.cpp @@ -12,7 +12,7 @@ int main (int argc, char *argv[]) { if (argc != 6) { cout << endl; - cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_frames] [rest_port] [user_id]" << endl; + cout << "Usage: mx_h5_zmq_writer [connection_address] [output_file] [n_frames] [rest_port] [user_id]" << endl; cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl; cout << "\toutput_file: Name of the output file." << endl; cout << "\tn_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl; @@ -28,12 +28,12 @@ int main (int argc, char *argv[]) if (user_id != -1) { #ifdef DEBUG_OUTPUT - cout << "[h5_zmq_writer::main] Setting process uid to " << user_id << endl; + cout << "[mx_h5_zmq_writer::main] Setting process uid to " << user_id << endl; #endif if (setuid(user_id)) { stringstream error_message; - error_message << "[h5_zmq_writer::main] Cannot set user_id to " << user_id << endl; + error_message << "[mx_h5_zmq_writer::main] Cannot set user_id to " << user_id << endl; throw runtime_error(error_message.str()); } diff --git a/src/sf_h5_zmq_writer.cpp b/src/sf_h5_zmq_writer.cpp new file mode 100644 index 0000000..4226862 --- /dev/null +++ b/src/sf_h5_zmq_writer.cpp @@ -0,0 +1,62 @@ +#include +#include +#include + +#include "config.hpp" +#include "ProcessManager.hpp" +#include "WriterManager.hpp" +#include "ZmqReceiver.hpp" +#include "format/SfFormat.cpp" + +int main (int argc, char *argv[]) +{ + if (argc != 6) { + cout << endl; + cout << "Usage: sf_h5_zmq_writer [connection_address] [output_file] [n_frames] [rest_port] [user_id]" << endl; + cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl; + cout << "\toutput_file: Name of the output file." << endl; + cout << "\tn_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl; + cout << "\trest_port: Port to start the REST Api on." << endl; + cout << "\tuser_id: uid under which to run the writer. -1 to leave it as it is." << endl; + cout << endl; + + exit(-1); + } + + // This process can be set to run under a different user. + auto user_id = atoi(argv[5]); + if (user_id != -1) { + + #ifdef DEBUG_OUTPUT + cout << "[sf_h5_zmq_writer::main] Setting process uid to " << user_id << endl; + #endif + + if (setuid(user_id)) { + stringstream error_message; + error_message << "[sf_h5_zmq_writer::main] Cannot set user_id to " << user_id << endl; + + throw runtime_error(error_message.str()); + } + } + + int n_frames = atoi(argv[3]); + string output_file = string(argv[2]); + + SfFormat format; + + WriterManager manager(format.get_input_value_type(), output_file, n_frames); + + string connect_address = string(argv[1]); + int n_io_threads = config::zmq_n_io_threads; + int receive_timeout = config::zmq_receive_timeout; + auto header_values = shared_ptr>(new unordered_map { + {"pulse_id", "uint64"}, + }); + ZmqReceiver receiver(connect_address, n_io_threads, receive_timeout); + + int rest_port = atoi(argv[4]); + + ProcessManager::run_writer(manager, format, receiver, rest_port); + + return 0; +}