mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-05-06 21:00:02 +02:00
cmake 3.9 and removed zmq
This commit is contained in:
parent
274ec27934
commit
9ca831d954
@ -1,6 +1,6 @@
|
||||
# SPDX-License-Identifier: LGPL-3.0-or-other
|
||||
# Copyright (C) 2021 Contributors to the SLS Detector Package
|
||||
cmake_minimum_required(VERSION 3.12)
|
||||
cmake_minimum_required(VERSION 3.9)
|
||||
project(slsDetectorPackage)
|
||||
set(PROJECT_VERSION 6.0.0)
|
||||
|
||||
@ -172,35 +172,35 @@ set(CMAKE_INSTALL_RPATH $ORIGIN)
|
||||
set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
|
||||
|
||||
|
||||
set(ZeroMQ_HINT "" CACHE STRING "Hint where ZeroMQ could be found")
|
||||
#Adapted from: https://github.com/zeromq/cppzmq/
|
||||
if (NOT TARGET libzmq)
|
||||
if(ZeroMQ_HINT)
|
||||
message(STATUS "Looking for ZeroMQ in: ${ZeroMQ_HINT}")
|
||||
find_package(ZeroMQ 4
|
||||
NO_DEFAULT_PATH
|
||||
HINTS ${ZeroMQ_DIR}
|
||||
)
|
||||
else()
|
||||
find_package(ZeroMQ 4 QUIET)
|
||||
endif()
|
||||
# set(ZeroMQ_HINT "" CACHE STRING "Hint where ZeroMQ could be found")
|
||||
# #Adapted from: https://github.com/zeromq/cppzmq/
|
||||
# if (NOT TARGET libzmq)
|
||||
# if(ZeroMQ_HINT)
|
||||
# message(STATUS "Looking for ZeroMQ in: ${ZeroMQ_HINT}")
|
||||
# find_package(ZeroMQ 4
|
||||
# NO_DEFAULT_PATH
|
||||
# HINTS ${ZeroMQ_DIR}
|
||||
# )
|
||||
# else()
|
||||
# find_package(ZeroMQ 4 QUIET)
|
||||
# endif()
|
||||
|
||||
# libzmq autotools install: fallback to pkg-config
|
||||
if(NOT ZeroMQ_FOUND)
|
||||
message(STATUS "CMake libzmq package not found, trying again with pkg-config (normal install of zeromq)")
|
||||
list (APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/libzmq-pkg-config)
|
||||
find_package(ZeroMQ 4 REQUIRED)
|
||||
endif()
|
||||
# # libzmq autotools install: fallback to pkg-config
|
||||
# if(NOT ZeroMQ_FOUND)
|
||||
# message(STATUS "CMake libzmq package not found, trying again with pkg-config (normal install of zeromq)")
|
||||
# list (APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/libzmq-pkg-config)
|
||||
# find_package(ZeroMQ 4 REQUIRED)
|
||||
# endif()
|
||||
|
||||
# TODO "REQUIRED" above should already cause a fatal failure if not found, but this doesn't seem to work
|
||||
if(NOT ZeroMQ_FOUND)
|
||||
message(FATAL_ERROR "ZeroMQ was not found, neither as a CMake package nor via pkg-config")
|
||||
endif()
|
||||
# # TODO "REQUIRED" above should already cause a fatal failure if not found, but this doesn't seem to work
|
||||
# if(NOT ZeroMQ_FOUND)
|
||||
# message(FATAL_ERROR "ZeroMQ was not found, neither as a CMake package nor via pkg-config")
|
||||
# endif()
|
||||
|
||||
if (ZeroMQ_FOUND AND NOT TARGET libzmq)
|
||||
message(FATAL_ERROR "ZeroMQ version not supported!")
|
||||
endif()
|
||||
endif()
|
||||
# if (ZeroMQ_FOUND AND NOT TARGET libzmq)
|
||||
# message(FATAL_ERROR "ZeroMQ version not supported!")
|
||||
# endif()
|
||||
# endif()
|
||||
|
||||
if (SLS_USE_TESTS)
|
||||
enable_testing()
|
||||
|
@ -80,7 +80,7 @@ target_include_directories(slsSupportObject
|
||||
target_link_libraries(slsSupportObject
|
||||
PUBLIC
|
||||
slsProjectOptions
|
||||
libzmq
|
||||
# libzmq
|
||||
rapidjson
|
||||
PRIVATE
|
||||
slsProjectWarnings
|
||||
|
@ -10,249 +10,249 @@
|
||||
#include <string.h>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <zmq.h>
|
||||
// #include <zmq.h>
|
||||
|
||||
using namespace rapidjson;
|
||||
ZmqSocket::ZmqSocket(const char *const hostname_or_ip,
|
||||
const uint32_t portnumber)
|
||||
: portno(portnumber), sockfd(false) {
|
||||
// Extra check that throws if conversion fails, could be removed
|
||||
auto ipstr = sls::HostnameToIp(hostname_or_ip).str();
|
||||
std::ostringstream oss;
|
||||
oss << "tcp://" << ipstr << ":" << portno;
|
||||
sockfd.serverAddress = oss.str();
|
||||
LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress;
|
||||
// auto ipstr = sls::HostnameToIp(hostname_or_ip).str();
|
||||
// std::ostringstream oss;
|
||||
// oss << "tcp://" << ipstr << ":" << portno;
|
||||
// sockfd.serverAddress = oss.str();
|
||||
// LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress;
|
||||
|
||||
// create context
|
||||
sockfd.contextDescriptor = zmq_ctx_new();
|
||||
if (sockfd.contextDescriptor == nullptr)
|
||||
throw sls::ZmqSocketError("Could not create contextDescriptor");
|
||||
// // create context
|
||||
// sockfd.contextDescriptor = zmq_ctx_new();
|
||||
// if (sockfd.contextDescriptor == nullptr)
|
||||
// throw sls::ZmqSocketError("Could not create contextDescriptor");
|
||||
|
||||
// create subscriber
|
||||
sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_SUB);
|
||||
if (sockfd.socketDescriptor == nullptr) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not create socket");
|
||||
}
|
||||
// // create subscriber
|
||||
// sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_SUB);
|
||||
// if (sockfd.socketDescriptor == nullptr) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not create socket");
|
||||
// }
|
||||
|
||||
// Socket Options provided above
|
||||
// an empty string implies receiving any messages
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could set socket opt");
|
||||
}
|
||||
// ZMQ_LINGER default is already -1 means no messages discarded. use this
|
||||
// options if optimizing required ZMQ_SNDHWM default is 0 means no limit.
|
||||
// use this to optimize if optimizing required eg. int value = -1;
|
||||
const int value = 0;
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,
|
||||
sizeof(value))) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not set ZMQ_LINGER");
|
||||
}
|
||||
LOG(logDEBUG) << "Default receive high water mark:"
|
||||
<< GetReceiveHighWaterMark();
|
||||
// // Socket Options provided above
|
||||
// // an empty string implies receiving any messages
|
||||
// if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could set socket opt");
|
||||
// }
|
||||
// // ZMQ_LINGER default is already -1 means no messages discarded. use this
|
||||
// // options if optimizing required ZMQ_SNDHWM default is 0 means no limit.
|
||||
// // use this to optimize if optimizing required eg. int value = -1;
|
||||
// const int value = 0;
|
||||
// if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,
|
||||
// sizeof(value))) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not set ZMQ_LINGER");
|
||||
// }
|
||||
// LOG(logDEBUG) << "Default receive high water mark:"
|
||||
// << GetReceiveHighWaterMark();
|
||||
}
|
||||
|
||||
ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip)
|
||||
: portno(portnumber), sockfd(true) {
|
||||
// create context
|
||||
sockfd.contextDescriptor = zmq_ctx_new();
|
||||
if (sockfd.contextDescriptor == nullptr)
|
||||
throw sls::ZmqSocketError("Could not create contextDescriptor");
|
||||
// sockfd.contextDescriptor = zmq_ctx_new();
|
||||
// if (sockfd.contextDescriptor == nullptr)
|
||||
// throw sls::ZmqSocketError("Could not create contextDescriptor");
|
||||
|
||||
// create publisher
|
||||
sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_PUB);
|
||||
if (sockfd.socketDescriptor == nullptr) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not create socket");
|
||||
}
|
||||
LOG(logDEBUG) << "Default send high water mark:" << GetSendHighWaterMark();
|
||||
// // create publisher
|
||||
// sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_PUB);
|
||||
// if (sockfd.socketDescriptor == nullptr) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not create socket");
|
||||
// }
|
||||
// LOG(logDEBUG) << "Default send high water mark:" << GetSendHighWaterMark();
|
||||
|
||||
// construct address, can be refactored with libfmt
|
||||
std::ostringstream oss;
|
||||
oss << "tcp://" << ethip << ":" << portno;
|
||||
sockfd.serverAddress = oss.str();
|
||||
LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress;
|
||||
// // construct address, can be refactored with libfmt
|
||||
// std::ostringstream oss;
|
||||
// oss << "tcp://" << ethip << ":" << portno;
|
||||
// sockfd.serverAddress = oss.str();
|
||||
// LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress;
|
||||
|
||||
// bind address
|
||||
if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not bind socket");
|
||||
}
|
||||
// sleep to allow a slow-joiner
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
// // bind address
|
||||
// if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not bind socket");
|
||||
// }
|
||||
// // sleep to allow a slow-joiner
|
||||
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
};
|
||||
|
||||
int ZmqSocket::GetSendHighWaterMark() {
|
||||
int value = 0;
|
||||
size_t value_size = sizeof(value);
|
||||
if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &value,
|
||||
&value_size)) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
|
||||
}
|
||||
// size_t value_size = sizeof(value);
|
||||
// if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &value,
|
||||
// &value_size)) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
|
||||
// }
|
||||
return value;
|
||||
}
|
||||
|
||||
void ZmqSocket::SetSendHighWaterMark(int limit) {
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &limit,
|
||||
sizeof(limit))) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
}
|
||||
// if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &limit,
|
||||
// sizeof(limit))) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
// }
|
||||
}
|
||||
|
||||
int ZmqSocket::GetReceiveHighWaterMark() {
|
||||
int value = 0;
|
||||
size_t value_size = sizeof(value);
|
||||
if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &value,
|
||||
&value_size)) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
|
||||
}
|
||||
// size_t value_size = sizeof(value);
|
||||
// if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &value,
|
||||
// &value_size)) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
|
||||
// }
|
||||
return value;
|
||||
}
|
||||
|
||||
void ZmqSocket::SetReceiveHighWaterMark(int limit) {
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &limit,
|
||||
sizeof(limit))) {
|
||||
PrintError();
|
||||
throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
}
|
||||
// if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &limit,
|
||||
// sizeof(limit))) {
|
||||
// PrintError();
|
||||
// throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
// }
|
||||
}
|
||||
|
||||
int ZmqSocket::Connect() {
|
||||
if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
PrintError();
|
||||
return 1;
|
||||
}
|
||||
// if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
// PrintError();
|
||||
// return 1;
|
||||
// }
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendHeader(int index, zmqHeader header) {
|
||||
|
||||
/** Json Header Format */
|
||||
const char jsonHeaderFormat[] = "{"
|
||||
"\"jsonversion\":%u, "
|
||||
"\"bitmode\":%u, "
|
||||
"\"fileIndex\":%lu, "
|
||||
"\"detshape\":[%u, %u], "
|
||||
"\"shape\":[%u, %u], "
|
||||
"\"size\":%u, "
|
||||
"\"acqIndex\":%lu, "
|
||||
"\"frameIndex\":%lu, "
|
||||
"\"progress\":%lf, "
|
||||
"\"fname\":\"%s\", "
|
||||
"\"data\": %d, "
|
||||
"\"completeImage\": %d, "
|
||||
// /** Json Header Format */
|
||||
// const char jsonHeaderFormat[] = "{"
|
||||
// "\"jsonversion\":%u, "
|
||||
// "\"bitmode\":%u, "
|
||||
// "\"fileIndex\":%lu, "
|
||||
// "\"detshape\":[%u, %u], "
|
||||
// "\"shape\":[%u, %u], "
|
||||
// "\"size\":%u, "
|
||||
// "\"acqIndex\":%lu, "
|
||||
// "\"frameIndex\":%lu, "
|
||||
// "\"progress\":%lf, "
|
||||
// "\"fname\":\"%s\", "
|
||||
// "\"data\": %d, "
|
||||
// "\"completeImage\": %d, "
|
||||
|
||||
"\"frameNumber\":%lu, "
|
||||
"\"expLength\":%u, "
|
||||
"\"packetNumber\":%u, "
|
||||
"\"bunchId\":%lu, "
|
||||
"\"timestamp\":%lu, "
|
||||
"\"modId\":%u, "
|
||||
"\"row\":%u, "
|
||||
"\"column\":%u, "
|
||||
"\"reserved\":%u, "
|
||||
"\"debug\":%u, "
|
||||
"\"roundRNumber\":%u, "
|
||||
"\"detType\":%u, "
|
||||
"\"version\":%u, "
|
||||
// "\"frameNumber\":%lu, "
|
||||
// "\"expLength\":%u, "
|
||||
// "\"packetNumber\":%u, "
|
||||
// "\"bunchId\":%lu, "
|
||||
// "\"timestamp\":%lu, "
|
||||
// "\"modId\":%u, "
|
||||
// "\"row\":%u, "
|
||||
// "\"column\":%u, "
|
||||
// "\"reserved\":%u, "
|
||||
// "\"debug\":%u, "
|
||||
// "\"roundRNumber\":%u, "
|
||||
// "\"detType\":%u, "
|
||||
// "\"version\":%u, "
|
||||
|
||||
// additional stuff
|
||||
"\"flipRows\":%u, "
|
||||
"\"quad\":%u"
|
||||
// // additional stuff
|
||||
// "\"flipRows\":%u, "
|
||||
// "\"quad\":%u"
|
||||
|
||||
; //"}\n";
|
||||
memset(header_buffer.get(), '\0', MAX_STR_LENGTH); // TODO! Do we need this
|
||||
sprintf(header_buffer.get(), jsonHeaderFormat, header.jsonversion,
|
||||
header.dynamicRange, header.fileIndex, header.ndetx, header.ndety,
|
||||
header.npixelsx, header.npixelsy, header.imageSize, header.acqIndex,
|
||||
header.frameIndex, header.progress, header.fname.c_str(),
|
||||
header.data ? 1 : 0, header.completeImage ? 1 : 0,
|
||||
// ; //"}\n";
|
||||
// memset(header_buffer.get(), '\0', MAX_STR_LENGTH); // TODO! Do we need this
|
||||
// sprintf(header_buffer.get(), jsonHeaderFormat, header.jsonversion,
|
||||
// header.dynamicRange, header.fileIndex, header.ndetx, header.ndety,
|
||||
// header.npixelsx, header.npixelsy, header.imageSize, header.acqIndex,
|
||||
// header.frameIndex, header.progress, header.fname.c_str(),
|
||||
// header.data ? 1 : 0, header.completeImage ? 1 : 0,
|
||||
|
||||
header.frameNumber, header.expLength, header.packetNumber,
|
||||
header.bunchId, header.timestamp, header.modId, header.row,
|
||||
header.column, header.reserved, header.debug, header.roundRNumber,
|
||||
header.detType, header.version,
|
||||
// header.frameNumber, header.expLength, header.packetNumber,
|
||||
// header.bunchId, header.timestamp, header.modId, header.row,
|
||||
// header.column, header.reserved, header.debug, header.roundRNumber,
|
||||
// header.detType, header.version,
|
||||
|
||||
// additional stuff
|
||||
header.flipRows, header.quad);
|
||||
// // additional stuff
|
||||
// header.flipRows, header.quad);
|
||||
|
||||
if (!header.addJsonHeader.empty()) {
|
||||
strcat(header_buffer.get(), ", ");
|
||||
strcat(header_buffer.get(), "\"addJsonHeader\": {");
|
||||
for (auto it = header.addJsonHeader.begin();
|
||||
it != header.addJsonHeader.end(); ++it) {
|
||||
if (it != header.addJsonHeader.begin()) {
|
||||
strcat(header_buffer.get(), ", ");
|
||||
}
|
||||
strcat(header_buffer.get(), "\"");
|
||||
strcat(header_buffer.get(), it->first.c_str());
|
||||
strcat(header_buffer.get(), "\":\"");
|
||||
strcat(header_buffer.get(), it->second.c_str());
|
||||
strcat(header_buffer.get(), "\"");
|
||||
}
|
||||
strcat(header_buffer.get(), " } ");
|
||||
}
|
||||
// if (!header.addJsonHeader.empty()) {
|
||||
// strcat(header_buffer.get(), ", ");
|
||||
// strcat(header_buffer.get(), "\"addJsonHeader\": {");
|
||||
// for (auto it = header.addJsonHeader.begin();
|
||||
// it != header.addJsonHeader.end(); ++it) {
|
||||
// if (it != header.addJsonHeader.begin()) {
|
||||
// strcat(header_buffer.get(), ", ");
|
||||
// }
|
||||
// strcat(header_buffer.get(), "\"");
|
||||
// strcat(header_buffer.get(), it->first.c_str());
|
||||
// strcat(header_buffer.get(), "\":\"");
|
||||
// strcat(header_buffer.get(), it->second.c_str());
|
||||
// strcat(header_buffer.get(), "\"");
|
||||
// }
|
||||
// strcat(header_buffer.get(), " } ");
|
||||
// }
|
||||
|
||||
strcat(header_buffer.get(), "}\n");
|
||||
int length = strlen(header_buffer.get());
|
||||
// strcat(header_buffer.get(), "}\n");
|
||||
// int length = strlen(header_buffer.get());
|
||||
|
||||
#ifdef VERBOSE
|
||||
// if(!index)
|
||||
cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf);
|
||||
#endif
|
||||
// #ifdef VERBOSE
|
||||
// // if(!index)
|
||||
// cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf);
|
||||
// #endif
|
||||
|
||||
if (zmq_send(sockfd.socketDescriptor, header_buffer.get(), length,
|
||||
header.data ? ZMQ_SNDMORE : 0) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
#ifdef VERBOSE
|
||||
cprintf(GREEN, "[%u] send header data\n", portno);
|
||||
#endif
|
||||
// if (zmq_send(sockfd.socketDescriptor, header_buffer.get(), length,
|
||||
// header.data ? ZMQ_SNDMORE : 0) < 0) {
|
||||
// PrintError();
|
||||
// return 0;
|
||||
// }
|
||||
// #ifdef VERBOSE
|
||||
// cprintf(GREEN, "[%u] send header data\n", portno);
|
||||
// #endif
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendData(char *buf, int length) {
|
||||
if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
// if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) {
|
||||
// PrintError();
|
||||
// return 0;
|
||||
// }
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveHeader(const int index, zmqHeader &zHeader,
|
||||
uint32_t version) {
|
||||
const int bytes_received = zmq_recv(sockfd.socketDescriptor,
|
||||
header_buffer.get(), MAX_STR_LENGTH, 0);
|
||||
if (bytes_received > 0) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno,
|
||||
bytes_received, buffer.data());
|
||||
#endif
|
||||
if (ParseHeader(index, bytes_received, header_buffer.get(), zHeader,
|
||||
version)) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index,
|
||||
portno, bytes_received, buffer.data());
|
||||
#endif
|
||||
if (!zHeader.data) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "%d [%d] Received end of acquisition\n", index,
|
||||
portno);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(GREEN, "%d [%d] data\n", index, portno);
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
// const int bytes_received = zmq_recv(sockfd.socketDescriptor,
|
||||
// header_buffer.get(), MAX_STR_LENGTH, 0);
|
||||
// if (bytes_received > 0) {
|
||||
// #ifdef ZMQ_DETAIL
|
||||
// cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno,
|
||||
// bytes_received, buffer.data());
|
||||
// #endif
|
||||
// if (ParseHeader(index, bytes_received, header_buffer.get(), zHeader,
|
||||
// version)) {
|
||||
// #ifdef ZMQ_DETAIL
|
||||
// cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index,
|
||||
// portno, bytes_received, buffer.data());
|
||||
// #endif
|
||||
// if (!zHeader.data) {
|
||||
// #ifdef ZMQ_DETAIL
|
||||
// cprintf(RED, "%d [%d] Received end of acquisition\n", index,
|
||||
// portno);
|
||||
// #endif
|
||||
// return 0;
|
||||
// }
|
||||
// #ifdef ZMQ_DETAIL
|
||||
// cprintf(GREEN, "%d [%d] data\n", index, portno);
|
||||
// #endif
|
||||
// return 1;
|
||||
// }
|
||||
// }
|
||||
return 0;
|
||||
};
|
||||
|
||||
@ -323,96 +323,96 @@ int ZmqSocket::ParseHeader(const int index, int length, char *buff,
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveData(const int index, char *buf, const int size) {
|
||||
zmq_msg_t message;
|
||||
zmq_msg_init(&message);
|
||||
int length = ReceiveMessage(index, message);
|
||||
if (length == size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), size);
|
||||
} else if (length < size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), length);
|
||||
memset(buf + length, 0xFF, size - length);
|
||||
} else {
|
||||
LOG(logERROR) << "Received weird packet size " << length
|
||||
<< " for socket " << index;
|
||||
memset(buf, 0xFF, size);
|
||||
}
|
||||
// zmq_msg_t message;
|
||||
// zmq_msg_init(&message);
|
||||
// int length = ReceiveMessage(index, message);
|
||||
// if (length == size) {
|
||||
// memcpy(buf, (char *)zmq_msg_data(&message), size);
|
||||
// } else if (length < size) {
|
||||
// memcpy(buf, (char *)zmq_msg_data(&message), length);
|
||||
// memset(buf + length, 0xFF, size - length);
|
||||
// } else {
|
||||
// LOG(logERROR) << "Received weird packet size " << length
|
||||
// << " for socket " << index;
|
||||
// memset(buf, 0xFF, size);
|
||||
// }
|
||||
|
||||
zmq_msg_close(&message);
|
||||
return length;
|
||||
// zmq_msg_close(&message);
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveMessage(const int index, zmq_msg_t &message) {
|
||||
int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0);
|
||||
if (length == -1) {
|
||||
PrintError();
|
||||
LOG(logERROR) << "Could not read header for socket " << index;
|
||||
}
|
||||
return length;
|
||||
// int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0);
|
||||
// if (length == -1) {
|
||||
// PrintError();
|
||||
// LOG(logERROR) << "Could not read header for socket " << index;
|
||||
// }
|
||||
return 1;
|
||||
}
|
||||
|
||||
void ZmqSocket::PrintError() {
|
||||
switch (errno) {
|
||||
case EINVAL:
|
||||
LOG(logERROR) << "The socket type/option or value/endpoint supplied is "
|
||||
"invalid (zmq)";
|
||||
break;
|
||||
case EAGAIN:
|
||||
LOG(logERROR) << "Non-blocking mode was requested and the message "
|
||||
"cannot be sent/available at the moment (zmq)";
|
||||
break;
|
||||
case ENOTSUP:
|
||||
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not "
|
||||
"supported by this socket type (zmq)";
|
||||
break;
|
||||
case EFSM:
|
||||
LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as "
|
||||
"socket in inappropriate state (eg. ZMQ_REP). Look up "
|
||||
"messaging patterns (zmq)";
|
||||
break;
|
||||
case EFAULT:
|
||||
LOG(logERROR) << "The provided context/message is invalid (zmq)";
|
||||
break;
|
||||
case EMFILE:
|
||||
LOG(logERROR) << "The limit on the total number of open ØMQ sockets "
|
||||
"has been reached (zmq)";
|
||||
break;
|
||||
case EPROTONOSUPPORT:
|
||||
LOG(logERROR)
|
||||
<< "The requested transport protocol is not supported (zmq)";
|
||||
break;
|
||||
case ENOCOMPATPROTO:
|
||||
LOG(logERROR) << "The requested transport protocol is not compatible "
|
||||
"with the socket type (zmq)";
|
||||
break;
|
||||
case EADDRINUSE:
|
||||
LOG(logERROR) << "The requested address is already in use (zmq)";
|
||||
break;
|
||||
case EADDRNOTAVAIL:
|
||||
LOG(logERROR) << "The requested address was not local (zmq)";
|
||||
break;
|
||||
case ENODEV:
|
||||
LOG(logERROR)
|
||||
<< "The requested address specifies a nonexistent interface (zmq)";
|
||||
break;
|
||||
case ETERM:
|
||||
LOG(logERROR) << "The ØMQ context associated with the specified socket "
|
||||
"was terminated (zmq)";
|
||||
break;
|
||||
case ENOTSOCK:
|
||||
LOG(logERROR) << "The provided socket was invalid (zmq)";
|
||||
break;
|
||||
case EINTR:
|
||||
LOG(logERROR)
|
||||
<< "The operation was interrupted by delivery of a signal (zmq)";
|
||||
break;
|
||||
case EMTHREAD:
|
||||
LOG(logERROR)
|
||||
<< "No I/O thread is available to accomplish the task (zmq)";
|
||||
break;
|
||||
default:
|
||||
LOG(logERROR) << "Unknown socket error (zmq)";
|
||||
break;
|
||||
}
|
||||
// switch (errno) {
|
||||
// case EINVAL:
|
||||
// LOG(logERROR) << "The socket type/option or value/endpoint supplied is "
|
||||
// "invalid (zmq)";
|
||||
// break;
|
||||
// case EAGAIN:
|
||||
// LOG(logERROR) << "Non-blocking mode was requested and the message "
|
||||
// "cannot be sent/available at the moment (zmq)";
|
||||
// break;
|
||||
// case ENOTSUP:
|
||||
// LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not "
|
||||
// "supported by this socket type (zmq)";
|
||||
// break;
|
||||
// case EFSM:
|
||||
// LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as "
|
||||
// "socket in inappropriate state (eg. ZMQ_REP). Look up "
|
||||
// "messaging patterns (zmq)";
|
||||
// break;
|
||||
// case EFAULT:
|
||||
// LOG(logERROR) << "The provided context/message is invalid (zmq)";
|
||||
// break;
|
||||
// case EMFILE:
|
||||
// LOG(logERROR) << "The limit on the total number of open ØMQ sockets "
|
||||
// "has been reached (zmq)";
|
||||
// break;
|
||||
// case EPROTONOSUPPORT:
|
||||
// LOG(logERROR)
|
||||
// << "The requested transport protocol is not supported (zmq)";
|
||||
// break;
|
||||
// case ENOCOMPATPROTO:
|
||||
// LOG(logERROR) << "The requested transport protocol is not compatible "
|
||||
// "with the socket type (zmq)";
|
||||
// break;
|
||||
// case EADDRINUSE:
|
||||
// LOG(logERROR) << "The requested address is already in use (zmq)";
|
||||
// break;
|
||||
// case EADDRNOTAVAIL:
|
||||
// LOG(logERROR) << "The requested address was not local (zmq)";
|
||||
// break;
|
||||
// case ENODEV:
|
||||
// LOG(logERROR)
|
||||
// << "The requested address specifies a nonexistent interface (zmq)";
|
||||
// break;
|
||||
// case ETERM:
|
||||
// LOG(logERROR) << "The ØMQ context associated with the specified socket "
|
||||
// "was terminated (zmq)";
|
||||
// break;
|
||||
// case ENOTSOCK:
|
||||
// LOG(logERROR) << "The provided socket was invalid (zmq)";
|
||||
// break;
|
||||
// case EINTR:
|
||||
// LOG(logERROR)
|
||||
// << "The operation was interrupted by delivery of a signal (zmq)";
|
||||
// break;
|
||||
// case EMTHREAD:
|
||||
// LOG(logERROR)
|
||||
// << "No I/O thread is available to accomplish the task (zmq)";
|
||||
// break;
|
||||
// default:
|
||||
// LOG(logERROR) << "Unknown socket error (zmq)";
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
|
||||
// Nested class to do RAII handling of socket descriptors
|
||||
@ -423,19 +423,19 @@ ZmqSocket::mySocketDescriptors::~mySocketDescriptors() {
|
||||
Close();
|
||||
}
|
||||
void ZmqSocket::mySocketDescriptors::Disconnect() {
|
||||
if (server)
|
||||
zmq_unbind(socketDescriptor, serverAddress.c_str());
|
||||
else
|
||||
zmq_disconnect(socketDescriptor, serverAddress.c_str());
|
||||
// if (server)
|
||||
// zmq_unbind(socketDescriptor, serverAddress.c_str());
|
||||
// else
|
||||
// zmq_disconnect(socketDescriptor, serverAddress.c_str());
|
||||
};
|
||||
void ZmqSocket::mySocketDescriptors::Close() {
|
||||
if (socketDescriptor != nullptr) {
|
||||
zmq_close(socketDescriptor);
|
||||
socketDescriptor = nullptr;
|
||||
}
|
||||
// if (socketDescriptor != nullptr) {
|
||||
// zmq_close(socketDescriptor);
|
||||
// socketDescriptor = nullptr;
|
||||
// }
|
||||
|
||||
if (contextDescriptor != nullptr) {
|
||||
zmq_ctx_destroy(contextDescriptor);
|
||||
contextDescriptor = nullptr;
|
||||
}
|
||||
// if (contextDescriptor != nullptr) {
|
||||
// zmq_ctx_destroy(contextDescriptor);
|
||||
// contextDescriptor = nullptr;
|
||||
// }
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user