Compare commits

...

9 Commits

Author SHA1 Message Date
cb8403f1b0 zmq 2020-03-18 12:38:06 +01:00
b751238fc1 ZmqSocket 2020-03-18 12:12:01 +01:00
5479d3a198 fix 2020-03-18 11:11:00 +01:00
e1768905dd build interface 2020-03-18 11:06:37 +01:00
775d0842e9 build interface 2020-03-18 11:00:54 +01:00
e7e201bd2a export and include 2020-03-18 10:20:01 +01:00
ec9f8305e9 not export gui 2020-03-18 08:41:25 +01:00
3307bfab1b fix 2020-03-17 19:11:29 +01:00
ce2c62000d include cmake in build 2020-03-17 18:57:19 +01:00
13 changed files with 649 additions and 599 deletions

View File

@ -5,18 +5,20 @@ mkdir $PREFIX/include
mkdir $PREFIX/include/slsDetectorPackage
#Shared and static libraries
cp build/bin/libSlsDetector.so $PREFIX/lib/.
cp build/bin/libSlsReceiver.so $PREFIX/lib/.
cp build/bin/libSlsSupport.so $PREFIX/lib/.
# cp build/bin/libSlsDetector.so $PREFIX/lib/.
# cp build/bin/libSlsReceiver.so $PREFIX/lib/.
# cp build/bin/libSlsSupport.so $PREFIX/lib/.
cp build/install/lib/* $PREFIX/lib/
#Binaries
cp build/bin/sls_detector_acquire $PREFIX/bin/.
cp build/bin/sls_detector_get $PREFIX/bin/.
cp build/bin/sls_detector_put $PREFIX/bin/.
cp build/bin/sls_detector_help $PREFIX/bin/.
cp build/bin/slsReceiver $PREFIX/bin/.
cp build/bin/slsMultiReceiver $PREFIX/bin/.
cp build/install/bin/sls_detector_acquire $PREFIX/bin/.
cp build/install/bin/sls_detector_get $PREFIX/bin/.
cp build/install/bin/sls_detector_put $PREFIX/bin/.
cp build/install/bin/sls_detector_help $PREFIX/bin/.
cp build/install/bin/slsReceiver $PREFIX/bin/.
cp build/install/bin/slsMultiReceiver $PREFIX/bin/.
#Which headers do we need for development??
cp build/install/include/* $PREFIX/include/slsDetectorPackage/
# cp include/some_lib.h $PREFIX/include/.
cp build/install/include/* $PREFIX/include/
cp -r build/install/share/ $PREFIX/share

View File

@ -7,7 +7,7 @@ source:
- path: ..
build:
number: 2
number: 0
binary_relocation: True
rpaths:
- lib/
@ -37,6 +37,7 @@ requirements:
host:
- libstdcxx-ng
- libgcc-ng
- zeromq
- xorg-libx11
- xorg-libice
- xorg-libxext
@ -46,6 +47,7 @@ requirements:
- xorg-libxfixes
run:
- zeromq
- libstdcxx-ng
- libgcc-ng

View File

@ -45,7 +45,7 @@ ext_modules = [
# get_pybind_include(),
# get_pybind_include(user=True),
os.path.join('../libs/pybind11/include'),
os.path.join(get_conda_path(), 'include/slsDetectorPackage'),
os.path.join(get_conda_path(), 'include'),
],
libraries=['SlsDetector', 'SlsReceiver', 'zmq'],

View File

@ -103,10 +103,10 @@ set_target_properties(slsDetectorGui PROPERTIES
)
install(TARGETS slsDetectorGui
EXPORT "${TARGETS_EXPORT_NAME}"
# EXPORT "${TARGETS_EXPORT_NAME}"
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
# LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
# ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
# PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)

View File

@ -1,5 +1,5 @@
#include "SlsQt2DPlot.h"
#include "ansi.h"
// #include "ansi.h"
#include <qlist.h>
#include <qprinter.h>

View File

@ -15,7 +15,6 @@ add_library(slsDetectorShared SHARED
${HEADERS}
)
# Do we have link time optimization?
check_ipo_supported(RESULT LTO_AVAILABLE)
if(LTO_AVAILABLE)
@ -28,15 +27,18 @@ target_include_directories(slsDetectorShared PUBLIC
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>"
)
target_link_libraries(slsDetectorShared PUBLIC
slsProjectOptions
slsProjectWarnings
target_link_libraries(slsDetectorShared
PUBLIC
slsSupportLib
${ZeroMQ_LIBRARIES}
pthread
rt
slsProjectOptions
PRIVATE
slsProjectWarnings
${ZeroMQ_LIBRARIES}
)
set(PUBLICHEADERS
include/slsDetectorUsers.h
include/detectorData.h
@ -69,7 +71,7 @@ foreach(val RANGE ${len2})
target_link_libraries(${val1}
slsDetectorShared
pthread
${ZeroMQ_LIBRARIES}
zmq
rt
)
set_target_properties(${val1} PROPERTIES

View File

@ -8,7 +8,6 @@
*@short functions basic implemenation of shared memory
*/
#include "ansi.h"
#include "logger.h"
#include "sls_detector_exceptions.h"
@ -18,7 +17,7 @@
#include <fcntl.h> // O_CREAT, O_TRUNC..
#include <iostream>
#include <sstream>
#include <stdio.h> // printf
// #include <stdio.h> // printf
#include <sys/mman.h> // shared memory
#include <sys/stat.h> // fstat
#include <unistd.h>

View File

@ -9,7 +9,6 @@
*/
#include "ansi.h"
#include "logger.h"
#include <string>

View File

@ -15,6 +15,7 @@
#include <fstream>
#include <iostream>
#include <sys/stat.h> // stat
#include <unistd.h>
/** cosntructor & destructor */

View File

@ -6,6 +6,7 @@ set(SOURCES
src/ServerSocket.cpp
src/ServerInterface.cpp
src/network_utils.cpp
src/ZmqSocket.cpp
)
set(HEADERS
@ -43,22 +44,36 @@ if(result)
endif()
target_include_directories(slsSupportLib PUBLIC
${ZeroMQ_INCLUDE_DIRS}
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>"
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>"
)
target_include_directories(slsSupportLib PRIVATE
${ZeroMQ_INCLUDE_DIRS}
)
set_target_properties(slsSupportLib PROPERTIES
LIBRARY_OUTPUT_NAME SlsSupport
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
PUBLIC_HEADER "${PUBLICHEADERS}"
)
message(${ZeroMQ_LIBRARIES})
target_link_libraries(slsSupportLib
PUBLIC
slsProjectOptions
# ${ZeroMQ_LIBRARIES}
zmq
PRIVATE
slsProjectWarnings
${ZeroMQ_LIBRARIES}
rapidjson)
PUBLIC
rapidjson
)
if (SLS_USE_TESTS)
add_subdirectory(tests)

View File

@ -7,20 +7,8 @@
*@short functions to open/close zmq sockets
*/
#include "ansi.h"
#include "sls_detector_exceptions.h"
#include <arpa/inet.h> //inet_ntoa
#include <errno.h>
#include <iostream>
#include <netdb.h> //gethostbyname()
#include <rapidjson/document.h> //json header in zmq stream
#include <string.h>
#include <unistd.h> //usleep in some machines
#include <vector>
#include <zmq.h>
using namespace rapidjson;
#define MAX_STR_LENGTH 1000
@ -28,14 +16,14 @@ using namespace rapidjson;
#define ROIVERBOSITY
class zmq_msg_t;
class ZmqSocket {
public:
// Socket Options for optimization
//ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required
//ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required
// ZMQ_LINGER default is already -1 means no messages discarded. use this
// options if optimizing required ZMQ_SNDHWM default is 0 means no limit. use
// this to optimize if optimizing required
// eg. int value = -1;
// if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
// Close();
@ -45,55 +33,7 @@ public:
* @param hostname hostname or ip of server
* @param portnumber port number
*/
ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber):
portno (portnumber)
// headerMessage(0)
{
char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH);
// convert hostname to ip (not required, but a test that returns if failed)
struct addrinfo *result;
if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) ||
(ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH)))
throw sls::ZmqSocketError("Could convert IP to string");
// construct address
sprintf (sockfd.serverAddress, "tcp://%s:%d", ip, portno);
#ifdef VERBOSE
cprintf(BLUE,"address:%s\n",sockfd.serverAddress);
#endif
// create context
sockfd.contextDescriptor = zmq_ctx_new();
if (sockfd.contextDescriptor == 0)
throw sls::ZmqSocketError("Could not create contextDescriptor");
// create publisher
sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_SUB);
if (sockfd.socketDescriptor == 0) {
PrintError ();
Close ();
throw sls::ZmqSocketError("Could not create socket");
}
//Socket Options provided above
// an empty string implies receiving any messages
if ( zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
PrintError ();
Close();
throw sls::ZmqSocketError("Could set socket opt");
}
//ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required
//ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required
// eg. int value = -1;
int value = 0;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
PrintError ();
Close();
throw sls::ZmqSocketError("Could not set ZMQ_LINGER");
}
};
ZmqSocket(const char *const hostname_or_ip, const uint32_t portnumber);
/**
* Constructor for a server
@ -102,80 +42,37 @@ public:
* @param portnumber port number
* @param ethip is the ip of the ethernet interface to stream zmq from
*/
ZmqSocket (const uint32_t portnumber, const char *ethip):
portno (portnumber)
// headerMessage(0)
{
sockfd.server = true;
// create context
sockfd.contextDescriptor = zmq_ctx_new();
if (sockfd.contextDescriptor == 0)
throw sls::ZmqSocketError("Could not create contextDescriptor");
// create publisher
sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_PUB);
if (sockfd.socketDescriptor == 0) {
PrintError ();
Close ();
throw sls::ZmqSocketError("Could not create socket");
}
//Socket Options provided above
// construct addresss
sprintf (sockfd.serverAddress,"tcp://%s:%d", ethip, portno);
#ifdef VERBOSE
cprintf(BLUE,"address:%s\n",sockfd.serverAddress);
#endif
// bind address
if (zmq_bind (sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError ();
Close ();
throw sls::ZmqSocketError("Could not bind socket");
}
//sleep for a few milliseconds to allow a slow-joiner
usleep(200* 1000);
};
ZmqSocket(const uint32_t portnumber, const char *ethip);
/**
* Destructor
*/
~ZmqSocket () {
//mySocketDescriptor destructor also gets called
};
~ZmqSocket() = default;
/**
* Returns Port Number
* @returns Port Number
*/
uint32_t GetPortNumber () { return portno; };
uint32_t GetPortNumber() { return portno; }
/**
* Returns Server Address
* @returns Server Address
*/
char* GetZmqServerAddress () { return sockfd.serverAddress; };
char *GetZmqServerAddress() { return sockfd.serverAddress; }
/**
* Returns Socket Descriptor
* @reutns Socket descriptor
*/
void* GetsocketDescriptor () { return sockfd.socketDescriptor; };
void *GetsocketDescriptor() { return sockfd.socketDescriptor; }
/**
* Connect client socket to server socket
* @returns 1 for fail, 0 for success
*/
int Connect() {
if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError ();
return 1;
}
return 0;
}
int Connect();
/**
* Unbinds the Socket
@ -195,28 +92,8 @@ public:
* @return 1 for fail, 0 for success
*/
// Do not make this static (for multi threading environment)
int ConvertHostnameToInternetAddress (const char* const hostname, struct addrinfo **res) {
// criteria in selecting socket address structures returned by res
struct addrinfo hints;
memset (&hints, 0, sizeof (hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
// get host info into res
int errcode = getaddrinfo (hostname, NULL, &hints, res);
if (errcode != 0) {
LOG(logERROR) << "Error: Could not convert hostname " << hostname << " to internet address (zmq):"
<< gai_strerror(errcode);
} else {
if (*res == NULL) {
LOG(logERROR) << "Could not convert hostname " << hostname << " to internet address (zmq): "
"gettaddrinfo returned null";
} else{
return 0;
}
}
LOG(logERROR) << "Could not convert hostname to internet address";
return 1;
};
int ConvertHostnameToInternetAddress(const char *const hostname,
struct addrinfo **res);
/**
* Convert Internet Address structure pointer to ip string (char*)
@ -227,16 +104,8 @@ public:
* @return 1 for fail, 0 for success
*/
// Do not make this static (for multi threading environment)
int ConvertInternetAddresstoIpString (struct addrinfo *res, char* ip, const int ipsize) {
if (inet_ntop (res->ai_family, &((struct sockaddr_in *) res->ai_addr)->sin_addr, ip, ipsize) != NULL) {
freeaddrinfo(res);
return 0;
}
LOG(logERROR) << "Could not convert internet address to ip string";
return 1;
}
int ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip,
const int ipsize);
/**
* Send Message Header
@ -263,95 +132,25 @@ public:
* @param roundRNumber not used yet
* @param detType detector enum
* @param version detector header version
* @param gapPixelsEnable gap pixels enable (exception: if gap pixels enable for 4 bit mode, data is not yet gap pixel enabled in receiver)
* @param gapPixelsEnable gap pixels enable (exception: if gap pixels enable
* for 4 bit mode, data is not yet gap pixel enabled in receiver)
* @param flippedDataX if it is flipped across x axis
* @param quadEnable if quad is enabled
* @param additionalJsonHeader additional json header
* @returns 0 if error, else 1
*/
int SendHeaderData ( int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange = 0, uint64_t fileIndex = 0,
uint32_t ndetx = 0, uint32_t ndety = 0, uint32_t npixelsx = 0, uint32_t npixelsy = 0, uint32_t imageSize = 0,
uint64_t acqIndex = 0, uint64_t fIndex = 0, const char* fname = NULL,
uint64_t frameNumber = 0, uint32_t expLength = 0, uint32_t packetNumber = 0,
uint64_t bunchId = 0, uint64_t timestamp = 0,
uint16_t modId = 0, uint16_t row = 0, uint16_t column = 0, uint16_t reserved = 0,
uint32_t debug = 0, uint16_t roundRNumber = 0,
uint8_t detType = 0, uint8_t version = 0, int gapPixelsEnable = 0, int flippedDataX = 0,
uint32_t quadEnable = 0,
std::string* additionalJsonHeader = 0) {
/** Json Header Format */
const char jsonHeaderFormat[] =
"{"
"\"jsonversion\":%u, "
"\"bitmode\":%u, "
"\"fileIndex\":%lu, "
"\"detshape\":[%u, %u], "
"\"shape\":[%u, %u], "
"\"size\":%u, "
"\"acqIndex\":%lu, "
"\"fIndex\":%lu, "
"\"fname\":\"%s\", "
"\"data\": %d, "
"\"frameNumber\":%lu, "
"\"expLength\":%u, "
"\"packetNumber\":%u, "
"\"bunchId\":%lu, "
"\"timestamp\":%lu, "
"\"modId\":%u, "
"\"row\":%u, "
"\"column\":%u, "
"\"reserved\":%u, "
"\"debug\":%u, "
"\"roundRNumber\":%u, "
"\"detType\":%u, "
"\"version\":%u, "
//additional stuff
"\"gappixels\":%u, "
"\"flippedDataX\":%u, "
"\"quad\":%u"
;//"}\n";
char buf[MAX_STR_LENGTH] = "";
sprintf(buf, jsonHeaderFormat,
jsonversion, dynamicrange, fileIndex, ndetx, ndety, npixelsx, npixelsy, imageSize,
acqIndex, fIndex, (fname == NULL)? "":fname, dummy?0:1,
frameNumber, expLength, packetNumber, bunchId, timestamp,
modId, row, column, reserved, debug, roundRNumber,
detType, version,
//additional stuff
gapPixelsEnable,
flippedDataX,
quadEnable
);
if (additionalJsonHeader && !((*additionalJsonHeader).empty())) {
strcat(buf, ", ");
strcat(buf, (*additionalJsonHeader).c_str());
}
strcat(buf, "}\n");
int length = strlen(buf);
#ifdef VERBOSE
//if(!index)
cprintf(BLUE,"%d : Streamer: buf: %s\n", index, buf);
#endif
if(zmq_send (sockfd.socketDescriptor, buf, length, dummy?0:ZMQ_SNDMORE) < 0) {
PrintError ();
return 0;
}
#ifdef VERBOSE
cprintf(GREEN,"[%u] send header data\n",portno);
#endif
return 1;
};
int SendHeaderData(
int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange = 0,
uint64_t fileIndex = 0, uint32_t ndetx = 0, uint32_t ndety = 0,
uint32_t npixelsx = 0, uint32_t npixelsy = 0, uint32_t imageSize = 0,
uint64_t acqIndex = 0, uint64_t fIndex = 0, const char *fname = nullptr,
uint64_t frameNumber = 0, uint32_t expLength = 0,
uint32_t packetNumber = 0, uint64_t bunchId = 0, uint64_t timestamp = 0,
uint16_t modId = 0, uint16_t row = 0, uint16_t column = 0,
uint16_t reserved = 0, uint32_t debug = 0, uint16_t roundRNumber = 0,
uint8_t detType = 0, uint8_t version = 0, int gapPixelsEnable = 0,
int flippedDataX = 0, uint32_t quadEnable = 0,
std::string *additionalJsonHeader = 0);
/**
* Send Message Body
@ -359,73 +158,19 @@ public:
* @param length length of message
* @returns 0 if error, else 1
*/
int SendData (char* buf, int length) {
if(zmq_send (sockfd.socketDescriptor, buf, length, 0) < 0) {
PrintError ();
return 0;
}
#ifdef VERBOSE
cprintf(GREEN,"[%u] send data\n",portno);
#endif
return 1;
};
int SendData(char *buf, int length);
/**
* Receive Message
* @param index self index for debugging
* @param message message
* @returns length of message, -1 if error
*/
int ReceiveMessage(const int index, zmq_msg_t& message) {
int length = zmq_msg_recv (&message, sockfd.socketDescriptor, 0);
if (length == -1) {
PrintError ();
LOG(logERROR) << "Could not read header for socket " << index;
}
#ifdef VERBOSE
else
cprintf( RED,"Message %d Length: %d Header:%s \n", index, length, (char*) zmq_msg_data (&message) );
#endif
return length;
};
/**
* Receive Header (Important to close message after parsing header)
* @param index self index for debugging
* @param document parsed document reference
* @param version version that has to match, -1 to not care
* @returns 0 if error or end of acquisition, else 1 (call CloseHeaderMessage after parsing header)
* @returns 0 if error or end of acquisition, else 1 (call
* CloseHeaderMessage after parsing header)
*/
int ReceiveHeader(const int index, Document& document, uint32_t version)
{
std::vector<char>buffer(MAX_STR_LENGTH);
int len = zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(),0);
if ( len > 0 ) {
bool dummy = false;
#ifdef ZMQ_DETAIL
cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data());
#endif
if ( ParseHeader (index, len, buffer.data(), document, dummy, version)) {
#ifdef ZMQ_DETAIL
cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data() );
#endif
if (dummy) {
#ifdef ZMQ_DETAIL
cprintf(RED,"%d [%d] Received end of acquisition\n", index, portno );
#endif
return 0;
}
#ifdef ZMQ_DETAIL
cprintf(GREEN,"%d [%d] data\n",index, portno );
#endif
return 1;
}
}
return 0;
};
int ReceiveHeader(const int index, rapidjson::Document &document, uint32_t version);
/**
* Close Header Message. Call this function if ReceiveHeader returned 1
@ -445,33 +190,8 @@ public:
* @param version version that has to match, -1 to not care
* @returns true if successful else false
*/
int ParseHeader(const int index, int length, char* buff,
Document& document, bool& dummy, uint32_t version)
{
if ( document.Parse( buff, length).HasParseError() ) {
LOG(logERROR) << index << " Could not parse. len:" << length << ": Message:" << buff;
fflush ( stdout );
// char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buff[i]);
}
printf("\n");
fflush( stdout );
return 0;
}
if (document["jsonversion"].GetUint() != version) {
LOG(logERROR) << "version mismatch. required " << version << ", got " << document["jsonversion"].GetUint();
return 0;
}
dummy = false;
int temp = document["data"].GetUint();
dummy = temp ? false : true;
return 1;
};
int ParseHeader(const int index, int length, char *buff, rapidjson::Document &document,
bool &dummy, uint32_t version);
/**
* Receive Data
@ -480,134 +200,36 @@ public:
* @param size size of image
* @returns length of data received
*/
int ReceiveData(const int index, char* buf, const int size)
{
zmq_msg_t message;
zmq_msg_init (&message);
int length = ReceiveMessage(index, message);
//actual data
if (length == size) {
#ifdef VERBOSE
cprintf(BLUE,"%d actual data\n", index);
#endif
memcpy(buf, (char*)zmq_msg_data(&message), size);
}
//incorrect size (smaller)
else if (length < size){
#ifdef ROIVERBOSITY
cprintf(RED,"Error: Received smaller packet size %d for socket %d\n", length, index);
#endif
memcpy(buf, (char*)zmq_msg_data(&message), length);
memset(buf+length,0xFF,size-length);
}
//incorrect size (larger)
else {
LOG(logERROR) << "Received weird packet size " << length << " for socket " << index;
memset(buf,0xFF,size);
}
zmq_msg_close(&message);
return length;
};
int ReceiveData(const int index, char *buf, const int size);
/**
* Print error
*/
void PrintError () {
switch (errno) {
case EINVAL:
LOG(logERROR) << "The socket type/option or value/endpoint supplied is invalid (zmq)";
break;
case EAGAIN:
LOG(logERROR) << "Non-blocking mode was requested and the message cannot be sent/available at the moment (zmq)";
break;
case ENOTSUP:
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not supported by this socket type (zmq)";
break;
case EFSM:
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as socket in inappropriate state (eg. ZMQ_REP). Look up messaging patterns (zmq)";
break;
case EFAULT:
LOG(logERROR) << "The provided context/message is invalid (zmq)";
break;
case EMFILE:
LOG(logERROR) << "The limit on the total number of open ØMQ sockets has been reached (zmq)";
break;
case EPROTONOSUPPORT:
LOG(logERROR) << "The requested transport protocol is not supported (zmq)";
break;
case ENOCOMPATPROTO:
LOG(logERROR) << "The requested transport protocol is not compatible with the socket type (zmq)";
break;
case EADDRINUSE:
LOG(logERROR) << "The requested address is already in use (zmq)";
break;
case EADDRNOTAVAIL:
LOG(logERROR) << "The requested address was not local (zmq)";
break;
case ENODEV:
LOG(logERROR) << "The requested address specifies a nonexistent interface (zmq)";
break;
case ETERM:
LOG(logERROR) << "The ØMQ context associated with the specified socket was terminated (zmq)";
break;
case ENOTSOCK:
LOG(logERROR) << "The provided socket was invalid (zmq)";
break;
case EINTR:
LOG(logERROR) << "The operation was interrupted by delivery of a signal (zmq)";
break;
case EMTHREAD:
LOG(logERROR) << "No I/O thread is available to accomplish the task (zmq)";
break;
default:
LOG(logERROR) << "Unknown socket error (zmq)";
break;
}
};
void PrintError();
private:
/**
* Receive Message
* @param index self index for debugging
* @param message message
* @returns length of message, -1 if error
*/
int ReceiveMessage(const int index, zmq_msg_t &message);
/**
* Class to close socket descriptors automatically
* upon encountering exceptions in the ZmqSocket constructor
*/
class mySocketDescriptors {
public:
/** Constructor */
mySocketDescriptors():
server(false),
contextDescriptor(0),
socketDescriptor(0) {};
mySocketDescriptors();
/** Destructor */
~mySocketDescriptors() {
Disconnect();
Close();
}
~mySocketDescriptors();
/** Unbinds the Socket */
void Disconnect () {
if (server)
zmq_unbind (socketDescriptor, serverAddress);
else
zmq_disconnect (socketDescriptor, serverAddress);
};
void Disconnect();
/** Close Socket and destroy Context */
void Close () {
if (socketDescriptor != NULL) {
zmq_close (socketDescriptor);
socketDescriptor = NULL;
}
if (contextDescriptor != NULL) {
zmq_ctx_destroy (contextDescriptor);
contextDescriptor = NULL;
}
};
void Close();
/** true if server, else false */
bool server;
/** Server Address */

View File

@ -14,8 +14,6 @@
#define __cplusplus
#endif
#include "ansi.h"
#ifdef __cplusplus
//C++ includes
#include "sls_detector_exceptions.h"

View File

@ -0,0 +1,410 @@
#include "ZmqSocket.h"
#include <zmq.h>
#include <vector>
#include <arpa/inet.h> //inet_ntoa
#include <errno.h>
#include <iostream>
#include <netdb.h> //gethostbyname()
#include <string.h>
#include <unistd.h> //usleep in some machines
using namespace rapidjson;
ZmqSocket::ZmqSocket(const char *const hostname_or_ip,
const uint32_t portnumber)
: portno(portnumber)
// headerMessage(0)
{
char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH);
// convert hostname to ip (not required, but a test that returns if failed)
struct addrinfo *result;
if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) ||
(ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH)))
throw sls::ZmqSocketError("Could convert IP to string");
// construct address
sprintf(sockfd.serverAddress, "tcp://%s:%d", ip, portno);
#ifdef VERBOSE
cprintf(BLUE, "address:%s\n", sockfd.serverAddress);
#endif
// create context
sockfd.contextDescriptor = zmq_ctx_new();
if (sockfd.contextDescriptor == 0)
throw sls::ZmqSocketError("Could not create contextDescriptor");
// create publisher
sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_SUB);
if (sockfd.socketDescriptor == 0) {
PrintError();
Close();
throw sls::ZmqSocketError("Could not create socket");
}
// Socket Options provided above
// an empty string implies receiving any messages
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
PrintError();
Close();
throw sls::ZmqSocketError("Could set socket opt");
}
// ZMQ_LINGER default is already -1 means no messages discarded. use this
// options if optimizing required ZMQ_SNDHWM default is 0 means no limit.
// use this to optimize if optimizing required eg. int value = -1;
int value = 0;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,
sizeof(value))) {
PrintError();
Close();
throw sls::ZmqSocketError("Could not set ZMQ_LINGER");
}
}
ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip)
:
portno(portnumber)
// headerMessage(0)
{
sockfd.server = true;
// create context
sockfd.contextDescriptor = zmq_ctx_new();
if (sockfd.contextDescriptor == 0)
throw sls::ZmqSocketError("Could not create contextDescriptor");
// create publisher
sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_PUB);
if (sockfd.socketDescriptor == 0) {
PrintError();
Close();
throw sls::ZmqSocketError("Could not create socket");
}
// Socket Options provided above
// construct addresss
sprintf(sockfd.serverAddress, "tcp://%s:%d", ethip, portno);
#ifdef VERBOSE
cprintf(BLUE, "address:%s\n", sockfd.serverAddress);
#endif
// bind address
if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError();
Close();
throw sls::ZmqSocketError("Could not bind socket");
}
// sleep for a few milliseconds to allow a slow-joiner
usleep(200 * 1000);
};
int ZmqSocket::Connect() {
if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError();
return 1;
}
return 0;
}
int ZmqSocket::ConvertHostnameToInternetAddress(const char *const hostname,
struct addrinfo **res) {
// criteria in selecting socket address structures returned by res
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
// get host info into res
int errcode = getaddrinfo(hostname, NULL, &hints, res);
if (errcode != 0) {
LOG(logERROR) << "Error: Could not convert hostname " << hostname
<< " to internet address (zmq):" << gai_strerror(errcode);
} else {
if (*res == NULL) {
LOG(logERROR) << "Could not convert hostname " << hostname
<< " to internet address (zmq): "
"gettaddrinfo returned null";
} else {
return 0;
}
}
LOG(logERROR) << "Could not convert hostname to internet address";
return 1;
};
int ZmqSocket::ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip,
const int ipsize) {
if (inet_ntop(res->ai_family,
&((struct sockaddr_in *)res->ai_addr)->sin_addr, ip,
ipsize) != NULL) {
freeaddrinfo(res);
return 0;
}
LOG(logERROR) << "Could not convert internet address to ip string";
return 1;
}
void ZmqSocket::PrintError() {
switch (errno) {
case EINVAL:
LOG(logERROR) << "The socket type/option or value/endpoint supplied is "
"invalid (zmq)";
break;
case EAGAIN:
LOG(logERROR) << "Non-blocking mode was requested and the message "
"cannot be sent/available at the moment (zmq)";
break;
case ENOTSUP:
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not "
"supported by this socket type (zmq)";
break;
case EFSM:
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as "
"socket in inappropriate state (eg. ZMQ_REP). Look up "
"messaging patterns (zmq)";
break;
case EFAULT:
LOG(logERROR) << "The provided context/message is invalid (zmq)";
break;
case EMFILE:
LOG(logERROR) << "The limit on the total number of open ØMQ sockets "
"has been reached (zmq)";
break;
case EPROTONOSUPPORT:
LOG(logERROR)
<< "The requested transport protocol is not supported (zmq)";
break;
case ENOCOMPATPROTO:
LOG(logERROR) << "The requested transport protocol is not compatible "
"with the socket type (zmq)";
break;
case EADDRINUSE:
LOG(logERROR) << "The requested address is already in use (zmq)";
break;
case EADDRNOTAVAIL:
LOG(logERROR) << "The requested address was not local (zmq)";
break;
case ENODEV:
LOG(logERROR)
<< "The requested address specifies a nonexistent interface (zmq)";
break;
case ETERM:
LOG(logERROR) << "The ØMQ context associated with the specified socket "
"was terminated (zmq)";
break;
case ENOTSOCK:
LOG(logERROR) << "The provided socket was invalid (zmq)";
break;
case EINTR:
LOG(logERROR)
<< "The operation was interrupted by delivery of a signal (zmq)";
break;
case EMTHREAD:
LOG(logERROR)
<< "No I/O thread is available to accomplish the task (zmq)";
break;
default:
LOG(logERROR) << "Unknown socket error (zmq)";
break;
}
}
int ZmqSocket::ReceiveData(const int index, char *buf, const int size) {
zmq_msg_t message;
zmq_msg_init(&message);
int length = ReceiveMessage(index, message);
if (length == size) {
memcpy(buf, (char *)zmq_msg_data(&message), size);
} else if (length < size) {
memcpy(buf, (char *)zmq_msg_data(&message), length);
memset(buf + length, 0xFF, size - length);
} else {
LOG(logERROR) << "Received weird packet size " << length
<< " for socket " << index;
memset(buf, 0xFF, size);
}
zmq_msg_close(&message);
return length;
}
int ZmqSocket::ParseHeader(const int index, int length, char *buff,
Document &document, bool &dummy, uint32_t version) {
if (document.Parse(buff, length).HasParseError()) {
LOG(logERROR) << index << " Could not parse. len:" << length
<< ": Message:" << buff;
fflush(stdout);
// char* buf = (char*) zmq_msg_data (&message);
for (int i = 0; i < length; ++i) {
cprintf(RED, "%02x ", buff[i]);
}
printf("\n");
fflush(stdout);
return 0;
}
if (document["jsonversion"].GetUint() != version) {
LOG(logERROR) << "version mismatch. required " << version << ", got "
<< document["jsonversion"].GetUint();
return 0;
}
dummy = false;
int temp = document["data"].GetUint();
dummy = temp ? false : true;
return 1;
}
int ZmqSocket::ReceiveHeader(const int index, Document &document,
uint32_t version) {
std::vector<char> buffer(MAX_STR_LENGTH);
int len =
zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(), 0);
if (len > 0) {
bool dummy = false;
#ifdef ZMQ_DETAIL
cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno,
len, buffer.data());
#endif
if (ParseHeader(index, len, buffer.data(), document, dummy, version)) {
#ifdef ZMQ_DETAIL
cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index,
portno, len, buffer.data());
#endif
if (dummy) {
#ifdef ZMQ_DETAIL
cprintf(RED, "%d [%d] Received end of acquisition\n", index,
portno);
#endif
return 0;
}
#ifdef ZMQ_DETAIL
cprintf(GREEN, "%d [%d] data\n", index, portno);
#endif
return 1;
}
}
return 0;
};
int ZmqSocket::ReceiveMessage(const int index, zmq_msg_t &message) {
int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0);
if (length == -1) {
PrintError();
LOG(logERROR) << "Could not read header for socket " << index;
}
return length;
}
int ZmqSocket::SendData(char *buf, int length) {
if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) {
PrintError();
return 0;
}
return 1;
}
int ZmqSocket::SendHeaderData(
int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange,
uint64_t fileIndex, uint32_t ndetx, uint32_t ndety, uint32_t npixelsx,
uint32_t npixelsy, uint32_t imageSize, uint64_t acqIndex, uint64_t fIndex,
const char *fname, uint64_t frameNumber, uint32_t expLength,
uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp, uint16_t modId,
uint16_t row, uint16_t column, uint16_t reserved, uint32_t debug,
uint16_t roundRNumber, uint8_t detType, uint8_t version,
int gapPixelsEnable, int flippedDataX, uint32_t quadEnable,
std::string *additionalJsonHeader) {
/** Json Header Format */
const char jsonHeaderFormat[] = "{"
"\"jsonversion\":%u, "
"\"bitmode\":%u, "
"\"fileIndex\":%lu, "
"\"detshape\":[%u, %u], "
"\"shape\":[%u, %u], "
"\"size\":%u, "
"\"acqIndex\":%lu, "
"\"fIndex\":%lu, "
"\"fname\":\"%s\", "
"\"data\": %d, "
"\"frameNumber\":%lu, "
"\"expLength\":%u, "
"\"packetNumber\":%u, "
"\"bunchId\":%lu, "
"\"timestamp\":%lu, "
"\"modId\":%u, "
"\"row\":%u, "
"\"column\":%u, "
"\"reserved\":%u, "
"\"debug\":%u, "
"\"roundRNumber\":%u, "
"\"detType\":%u, "
"\"version\":%u, "
// additional stuff
"\"gappixels\":%u, "
"\"flippedDataX\":%u, "
"\"quad\":%u"
; //"}\n";
char buf[MAX_STR_LENGTH] = "";
sprintf(buf, jsonHeaderFormat, jsonversion, dynamicrange, fileIndex, ndetx,
ndety, npixelsx, npixelsy, imageSize, acqIndex, fIndex,
(fname == NULL) ? "" : fname, dummy ? 0 : 1,
frameNumber, expLength, packetNumber, bunchId, timestamp, modId,
row, column, reserved, debug, roundRNumber, detType, version,
// additional stuff
gapPixelsEnable, flippedDataX, quadEnable);
if (additionalJsonHeader && !((*additionalJsonHeader).empty())) {
strcat(buf, ", ");
strcat(buf, (*additionalJsonHeader).c_str());
}
strcat(buf, "}\n");
int length = strlen(buf);
#ifdef VERBOSE
// if(!index)
cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf);
#endif
if (zmq_send(sockfd.socketDescriptor, buf, length,
dummy ? 0 : ZMQ_SNDMORE) < 0) {
PrintError();
return 0;
}
#ifdef VERBOSE
cprintf(GREEN, "[%u] send header data\n", portno);
#endif
return 1;
}
//Nested class to do RAII handling of socket descriptors
ZmqSocket::mySocketDescriptors::mySocketDescriptors()
: server(false), contextDescriptor(0), socketDescriptor(0){};
ZmqSocket::mySocketDescriptors::~mySocketDescriptors() {
Disconnect();
Close();
}
void ZmqSocket::mySocketDescriptors::Disconnect() {
if (server)
zmq_unbind(socketDescriptor, serverAddress);
else
zmq_disconnect(socketDescriptor, serverAddress);
};
void ZmqSocket::mySocketDescriptors::Close() {
if (socketDescriptor != NULL) {
zmq_close(socketDescriptor);
socketDescriptor = NULL;
}
if (contextDescriptor != NULL) {
zmq_ctx_destroy(contextDescriptor);
contextDescriptor = NULL;
}
};