developer: moench empty data callback (#936)

* moench (feature to wait for post processing to be done in new sls_detector_acquire_zmq executable)

READOUT_ACTION_ZMQ added to action enums
sls_detector_acquire_zmq added to executables
empty data call back so that client listens to last dummy zmq packet from moench post processor
processor: remove NEWZMQ ifdefs and remove connect for zmq publisher socket

* fix to compile

* cmds generated and parsed
This commit is contained in:
maliakal_d 2024-08-13 11:27:06 +02:00 committed by GitHub
parent c13049f144
commit de33aff077
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 30 additions and 52 deletions

View File

@ -183,8 +183,8 @@ int main(int argc, char *argv[]) {
gainfname = args["gainfile"]; gainfname = args["gainfile"];
etafname = args["etafilefile"]; etafname = args["etafilefile"];
if (atoi(args["nuninterfaces"].c_str())>1){ if (atoi(args["numinterfaces"].c_str())>1){
cprintf(RED, "Sorry, at the moment only a single interface is supported instead of %d\n",atoi(args["nuninterfaces"].c_str())); cprintf(RED, "Sorry, at the moment only a single interface is supported instead of %d\n",atoi(args["numinterfaces"].c_str()));
return EXIT_FAILURE; return EXIT_FAILURE;
} }
@ -272,54 +272,34 @@ int main(int argc, char *argv[]) {
sls::ZmqSocket *zmqsocket = NULL; sls::ZmqSocket *zmqsocket = NULL;
#ifdef NEWZMQ
// receive socket // receive socket
try { try {
#endif
zmqsocket = new sls::ZmqSocket(socketip.c_str(), portnum); zmqsocket = new sls::ZmqSocket(socketip.c_str(), portnum);
#ifdef NEWZMQ
} catch (...) { } catch (...) {
cprintf(RED, cprintf(RED,
"Error: Could not create Zmq socket on port %d with ip %s\n", "Error: Could not create Zmq receiving socket on port %d with ip %s\n",
portnum, socketip.c_str()); portnum, socketip.c_str());
delete zmqsocket; delete zmqsocket;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
#endif
#ifndef NEWZMQ
if (zmqsocket->IsError()) {
cprintf(RED,
"Error: Could not create Zmq socket on port %d with ip %s\n",
portnum, socketip.c_str());
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
if (zmqsocket->Connect()) { if (zmqsocket->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n", cprintf(RED, "Error: Could not connect to Zmq receiving socket %s\n",
(zmqsocket->GetZmqServerAddress()).c_str()); (zmqsocket->GetZmqServerAddress()).c_str());
delete zmqsocket; delete zmqsocket;
return EXIT_FAILURE; return EXIT_FAILURE;
} else } else
printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress().c_str()); printf("Zmq receiving socket at %s\n", zmqsocket->GetZmqServerAddress().c_str());
// send socket // send socket
sls::ZmqSocket *zmqsocket2 = 0; sls::ZmqSocket *zmqsocket2 = 0;
// cout << "zmq2 " << endl; // cout << "zmq2 " << endl;
if (send) { if (send) {
#ifdef NEWZMQ
// receive socket // receive socket
try { try {
#endif zmqsocket2 = new sls::ZmqSocket(portnum2, socketip2.c_str());
zmqsocket2 = new sls::ZmqSocket(portnum2, socketip2.c_str());
#ifdef NEWZMQ
} catch (...) { } catch (...) {
cprintf(RED, cprintf(RED,
"Error: Could not create Zmq socket server on port %d and " "Error: Could not create Zmq sending socket on port %d and "
"ip %s\n", "ip %s\n",
portnum2, socketip2.c_str()); portnum2, socketip2.c_str());
// delete zmqsocket2; // delete zmqsocket2;
@ -328,28 +308,7 @@ int main(int argc, char *argv[]) {
// return EXIT_FAILURE; // return EXIT_FAILURE;
send = false; send = false;
} }
#endif printf("Zmq sending socket at %s\n",
#ifndef NEWZMQ
if (zmqsocket2->IsError()) {
cprintf(RED,
"AAA Error: Could not create Zmq socket server on port %d "
"and ip %s\n",
portnum2, socketip2.c_str());
// delete zmqsocket2;
// delete zmqsocket;
// return EXIT_FAILURE;
send = false;
}
#endif
if (zmqsocket2->Connect()) {
cprintf(RED, "BBB Error: Could not connect to socket %s\n",
zmqsocket2->GetZmqServerAddress().c_str());
// delete zmqsocket2;
send = false;
// return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n",
zmqsocket2->GetZmqServerAddress().c_str()); zmqsocket2->GetZmqServerAddress().c_str());
} }

View File

@ -82,10 +82,11 @@ if(SLS_USE_TEXTCLIENT)
set(det_bin_names "sls_detector_put" set(det_bin_names "sls_detector_put"
"sls_detector_get" "sls_detector_get"
"sls_detector_acquire" "sls_detector_acquire"
"sls_detector_acquire_zmq"
"sls_detector_help" "sls_detector_help"
"sls_detector" "sls_detector"
) )
set(det_cmd_name "PUT" "GET" "READOUT" "HELP" "INFER") set(det_cmd_name "PUT" "GET" "READOUT" "READOUTZMQ" "HELP" "INFER")
list(LENGTH det_bin_names len1) list(LENGTH det_bin_names len1)
math(EXPR len2 "${len1} - 1") math(EXPR len2 "${len1} - 1")

View File

@ -71,6 +71,8 @@ class Caller {
using StringMap = std::map<std::string, std::string>; using StringMap = std::map<std::string, std::string>;
Detector *ptr; // pointer to the detector that executes the command Detector *ptr; // pointer to the detector that executes the command
static void EmptyDataCallBack(detectorData *data, uint64_t frameIndex, uint32_t subFrameIndex, void *this_pointer);
FunctionMap functions{ FunctionMap functions{
{"list", &Caller::list}, {"list", &Caller::list},

View File

@ -397,6 +397,9 @@ class Caller {
using StringMap = std::map<std::string, std::string>; using StringMap = std::map<std::string, std::string>;
Detector *ptr; // pointer to the detector that executes the command Detector *ptr; // pointer to the detector that executes the command
static void EmptyDataCallBack(detectorData *data, uint64_t frameIndex,
uint32_t subFrameIndex, void *this_pointer);
FunctionMap functions{ FunctionMap functions{
{"list", &Caller::list}, {"list", &Caller::list},

View File

@ -212,6 +212,12 @@ std::string Caller::hostname(int action) {
} }
return os.str(); return os.str();
} }
void Caller::EmptyDataCallBack(detectorData *data, uint64_t frameIndex,
uint32_t subFrameIndex, void *this_pointer) {
LOG(logDEBUG) << "EmptyDataCallBack to start up zmq sockets";
}
std::string Caller::acquire(int action) { std::string Caller::acquire(int action) {
std::ostringstream os; std::ostringstream os;
if (action == defs::HELP_ACTION) { if (action == defs::HELP_ACTION) {
@ -232,6 +238,9 @@ std::string Caller::acquire(int action) {
if (det_id >= 0) { if (det_id >= 0) {
throw RuntimeError("Individual detectors not allowed for readout."); throw RuntimeError("Individual detectors not allowed for readout.");
} }
if (action == defs::READOUT_ZMQ_ACTION) {
det->registerDataCallback(&(EmptyDataCallBack), this);
}
det->acquire(); det->acquire();
if (det->getUseReceiverFlag().squash(false)) { if (det->getUseReceiverFlag().squash(false)) {

View File

@ -20,6 +20,10 @@ int main(int argc, char *argv[]) {
int action = slsDetectorDefs::READOUT_ACTION; int action = slsDetectorDefs::READOUT_ACTION;
#endif #endif
#ifdef READOUTZMQ
int action = slsDetectorDefs::READOUT_ZMQ_ACTION;
#endif
#ifdef HELP #ifdef HELP
int action = slsDetectorDefs::HELP_ACTION; int action = slsDetectorDefs::HELP_ACTION;
#endif #endif
@ -38,7 +42,7 @@ int main(int argc, char *argv[]) {
sls::CmdParser parser; sls::CmdParser parser;
parser.Parse(argc, argv); parser.Parse(argc, argv);
if (action == slsDetectorDefs::READOUT_ACTION) if (action == slsDetectorDefs::READOUT_ACTION || action == slsDetectorDefs::READOUT_ZMQ_ACTION)
parser.setCommand("acquire"); parser.setCommand("acquire");
if (parser.isHelp()) if (parser.isHelp())

View File

@ -216,7 +216,7 @@ typedef struct {
/** /**
type of action performed (for text client) type of action performed (for text client)
*/ */
enum { GET_ACTION, PUT_ACTION, READOUT_ACTION, HELP_ACTION }; enum { GET_ACTION, PUT_ACTION, READOUT_ACTION, HELP_ACTION, READOUT_ZMQ_ACTION };
/** /**
dimension indexes dimension indexes