zmq fixed to have additional json headers using rx_zmqjsonheader

This commit is contained in:
2018-04-09 16:33:14 +02:00
parent ccdc7d22e9
commit 2bd8e6c166
14 changed files with 195 additions and 10 deletions

View File

@ -26,7 +26,7 @@ pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER;
bool DataStreamer::SilentMode(false);
DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd) :
DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh) :
ThreadObject(NumberofDataStreamers),
generalData(0),
fifo(f),
@ -39,7 +39,8 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, i
firstAcquisitionIndex(0),
firstMeasurementIndex(0),
completeBuffer(0),
flippedData(fd)
flippedData(fd),
additionJsonHeader(ajh)
{
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
@ -287,8 +288,9 @@ int DataStreamer::SendHeader(sls_detector_header* header, uint32_t size, uint32_
header->modId, header->xCoord, header->yCoord, header->zCoord,
header->debug, header->roundRNumber,
header->detType, header->version,
flippedData
);
flippedData,
additionJsonHeader
);
}

View File

@ -83,6 +83,7 @@ void UDPBaseImplementation::initializeMembers(){
dataStreamEnable = false;
streamingPort = 0;
memset(streamingSrcIP, 0, sizeof(streamingSrcIP));
memset(additionalJsonHeader, 0, sizeof(additionalJsonHeader));
//***receiver parameters***
silentMode = 0;
@ -232,6 +233,16 @@ char *UDPBaseImplementation::getStreamingSourceIP() const{
return output;
}
char *UDPBaseImplementation::getAdditionalJsonHeader() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
char* output = new char[MAX_STR_LENGTH]();
memset(output, 0, MAX_STR_LENGTH);
strcpy(output,additionalJsonHeader);
//freed by calling function
return output;
}
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
@ -597,6 +608,11 @@ void UDPBaseImplementation::setStreamingSourceIP(const char c[]){
FILE_LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP;
}
void UDPBaseImplementation::setAdditionalJsonHeader(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(additionalJsonHeader, c);
FILE_LOG(logINFO) << "Additional JSON Header: " << additionalJsonHeader;
}
int UDPBaseImplementation::restreamStop() {
FILE_LOG(logERROR) << __AT__ << " doing nothing...";

View File

@ -218,7 +218,7 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
if (enable) {
bool error = false;
for ( int i = 0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &shortFrameEnable, &fileIndex, flippedData));
dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &shortFrameEnable, &fileIndex, flippedData, additionalJsonHeader));
dataStreamer[i]->SetGeneralData(generalData);
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP) == FAIL) {
error = true;

View File

@ -293,6 +293,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_RECEIVER_STREAMING_SRC_IP: return "F_RECEIVER_STREAMING_SRC_IP";
case F_ENABLE_GAPPIXELS_IN_RECEIVER:return "F_ENABLE_GAPPIXELS_IN_RECEIVER";
case F_RESTREAM_STOP_FROM_RECEIVER: return "F_RESTREAM_STOP_FROM_RECEIVER";
case F_ADDITIONAL_JSON_HEADER: return "F_ADDITIONAL_JSON_HEADER";
default: return "Unknown Function";
}
}
@ -341,6 +342,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_RECEIVER_STREAMING_SRC_IP] = &slsReceiverTCPIPInterface::set_streaming_source_ip;
flist[F_ENABLE_GAPPIXELS_IN_RECEIVER] = &slsReceiverTCPIPInterface::enable_gap_pixels;
flist[F_RESTREAM_STOP_FROM_RECEIVER] = &slsReceiverTCPIPInterface::restream_stop;
flist[F_ADDITIONAL_JSON_HEADER] = &slsReceiverTCPIPInterface::set_additional_json_header;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
@ -716,6 +718,14 @@ int slsReceiverTCPIPInterface::send_update() {
if (path != NULL)
delete[] path;
// additional json header
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
path = receiverBase->getAdditionalJsonHeader();
#endif
mySock->SendDataOnly(path,MAX_STR_LENGTH);
if (path != NULL)
delete[] path;
// gap pixels enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind = (int)receiverBase->getGapPixelsEnable();
@ -2549,3 +2559,52 @@ int slsReceiverTCPIPInterface::restream_stop(){
// return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::set_additional_json_header() {
ret = OK;
memset(mess, 0, sizeof(mess));
char arg[MAX_STR_LENGTH];
memset(arg, 0, sizeof(arg));
char* retval=NULL;
// receive arguments
if (mySock->ReceiveDataOnly(arg,MAX_STR_LENGTH) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setAdditionalJsonHeader(arg);
}
//get
retval = receiverBase->getAdditionalJsonHeader();
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "additional json header:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(retval,MAX_STR_LENGTH);
delete[] retval;
// return ok/fail
return ret;
}