switched from zmq_msg_revc to zmq_recv

This commit is contained in:
Erik Frojdh 2018-08-09 17:40:04 +02:00
parent 1102153d2b
commit dd3f4aa81d
2 changed files with 28 additions and 25 deletions

View File

@ -4961,7 +4961,7 @@ void multiSlsDetector::readFrameFromReceiver() {
rapidjson::Document doc;
if (!zmqSocket[isocket]->ReceiveHeader(isocket, doc,
SLS_DETECTOR_JSON_HEADER_VERSION)) {
zmqSocket[isocket]->CloseHeaderMessage();
// zmqSocket[isocket]->CloseHeaderMessage();
// parse error, version error or end of acquisition for socket
runningList[isocket] = false;
--numRunning;
@ -5022,7 +5022,7 @@ void multiSlsDetector::readFrameFromReceiver() {
coordX, coordY,
flippedDataX);
#endif
zmqSocket[isocket]->CloseHeaderMessage();
// zmqSocket[isocket]->CloseHeaderMessage();
}
// DATA

View File

@ -18,12 +18,13 @@
#include <rapidjson/document.h> //json header in zmq stream
#include <string.h>
#include <unistd.h> //usleep in some machines
#include <vector>
using namespace rapidjson;
#define MAX_STR_LENGTH 1000
//#define ZMQ_DETAIL
#define ZMQ_DETAIL
#define ROIVERBOSITY
class ZmqSocket {
@ -48,8 +49,8 @@ public:
portno (portnumber),
server (false),
contextDescriptor (NULL),
socketDescriptor (NULL),
headerMessage(0)
socketDescriptor (NULL)
// headerMessage(0)
{
char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH);
@ -108,8 +109,8 @@ public:
portno (portnumber),
server (true),
contextDescriptor (NULL),
socketDescriptor (NULL),
headerMessage(0)
socketDescriptor (NULL)
// headerMessage(0)
{
// create context
contextDescriptor = zmq_ctx_new();
@ -389,18 +390,20 @@ public:
*/
int ReceiveHeader(const int index, Document& document, uint32_t version)
{
zmq_msg_t message;
headerMessage= &message;
zmq_msg_init (&message);
int len = ReceiveMessage(index, message);
// zmq_msg_t message;
// headerMessage= &message;
// zmq_msg_init (&message);
// int len = ReceiveMessage(index, message);
std::vector<char>buffer(MAX_STR_LENGTH);
int len = zmq_recv(socketDescriptor, buffer.data(), buffer.size(),0);
if ( len > 0 ) {
bool dummy = false;
#ifdef ZMQ_DETAIL
cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) );
cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data());
#endif
if ( ParseHeader (index, len, message, document, dummy, version)) {
if ( ParseHeader (index, len, buffer.data(), document, dummy, version)) {
#ifdef ZMQ_DETAIL
cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) );
cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data() );
#endif
if (dummy) {
#ifdef ZMQ_DETAIL
@ -421,11 +424,11 @@ public:
/**
* Close Header Message. Call this function if ReceiveHeader returned 1
*/
void CloseHeaderMessage() {
if (headerMessage)
zmq_msg_close(headerMessage);
headerMessage = 0;
};
// void CloseHeaderMessage() {
// if (headerMessage)
// zmq_msg_close(headerMessage);
// headerMessage = 0;
// };
/**
* Parse Header
* @param index self index for debugging
@ -436,15 +439,15 @@ public:
* @param version version that has to match, -1 to not care
* @returns true if successful else false
*/
int ParseHeader(const int index, int length, zmq_msg_t& message,
int ParseHeader(const int index, int length, char* buff,
Document& document, bool& dummy, uint32_t version)
{
if ( document.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) {
cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) );
if ( document.Parse( buff, length).HasParseError() ) {
cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, buff );
fflush ( stdout );
char* buf = (char*) zmq_msg_data (&message);
// char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buf[i]);
cprintf(RED,"%02x ",buff[i]);
}
printf("\n");
fflush( stdout );
@ -603,6 +606,6 @@ private:
char serverAddress[1000];
/** Header Message pointer */
zmq_msg_t* headerMessage;
// zmq_msg_t* headerMessage;
};