made default streamer to be the ip related to hostname, otherwise can be specified from the client, also made it compatible with chip test board detector now

This commit is contained in:
Dhanya Maliakal
2017-09-12 15:00:59 +02:00
parent c37d3feae9
commit 5ff574b33f
13 changed files with 324 additions and 34 deletions

View File

@ -154,10 +154,10 @@ int DataStreamer::SetThreadPriority(int priority) {
}
int DataStreamer::CreateZmqSockets(int* nunits, uint32_t port) {
int DataStreamer::CreateZmqSockets(int* nunits, uint32_t port, const char* srcip) {
uint32_t portnum = port + index;
zmqSocket = new ZmqSocket(portnum);
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL));
if (zmqSocket->IsError()) {
bprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index);
return FAIL;

View File

@ -47,6 +47,7 @@ void UDPBaseImplementation::initializeMembers(){
acquisitionTime = 0;
subExpTime = 0;
numberOfFrames = 0;
numberOfSamples = 0;
dynamicRange = 16;
tengigaEnable = false;
fifoDepth = 0;
@ -80,6 +81,7 @@ void UDPBaseImplementation::initializeMembers(){
frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER_IN_MS;
dataStreamEnable = false;
streamingPort = 0;
memset(streamingSrcIP, 0, sizeof(streamingSrcIP));
}
UDPBaseImplementation::~UDPBaseImplementation(){}
@ -197,6 +199,8 @@ uint64_t UDPBaseImplementation::getSubExpTime() const{ FILE_LOG(logDEBUG) << __A
uint64_t UDPBaseImplementation::getNumberOfFrames() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return numberOfFrames;}
uint64_t UDPBaseImplementation::getNumberofSamples() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return numberOfSamples;}
uint32_t UDPBaseImplementation::getDynamicRange() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dynamicRange;}
bool UDPBaseImplementation::getTenGigaEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return tengigaEnable;}
@ -210,6 +214,15 @@ int UDPBaseImplementation::getActivate() const{FILE_LOG(logDEBUG) << __AT__ << "
uint32_t UDPBaseImplementation::getStreamingPort() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return streamingPort;}
char *UDPBaseImplementation::getStreamingSourceIP() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,streamingSrcIP);
//freed by calling function
return output;
}
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
@ -435,6 +448,16 @@ int UDPBaseImplementation::setNumberOfFrames(const uint64_t i){
return OK;
}
int UDPBaseImplementation::setNumberofSamples(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
numberOfSamples = i;
FILE_LOG(logINFO) << "Number of Samples: " << numberOfSamples;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setDynamicRange(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -559,6 +582,12 @@ void UDPBaseImplementation::setStreamingPort(const uint32_t i) {
FILE_LOG(logINFO) << "Streaming Port: " << streamingPort;
}
void UDPBaseImplementation::setStreamingSourceIP(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(streamingSrcIP, c);
FILE_LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP;
}
/***callback functions***/
void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){

View File

@ -15,6 +15,7 @@
#include <cstdlib> //system
#include <cstring> //strcpy
#include <errno.h> //eperm
#include <math.h> //ceil
using namespace std;
@ -54,6 +55,7 @@ void UDPStandardImplementation::InitializeMembers() {
//*** receiver parameters ***
numThreads = 1;
numberofJobs = 1;
nroichannels = 0;
//** class objects ***
generalData = 0;
@ -164,17 +166,6 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
if (frameToGuiFrequency != freq) {
frameToGuiFrequency = freq;
/*//only the ones lisening to more than 1 frame at a time needs to change fifo structure
switch (myDetectorType) {
case GOTTHARD:
case PROPIX:
if (SetupFifoStructure() == FAIL)
return FAIL;
break;
default:
break;
}*/
}
FILE_LOG (logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency;
return OK;
@ -199,7 +190,7 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\
// check again
if (streamingPort == 0)
streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort) == FAIL) {
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP) == FAIL) {
error = true;
break;
}
@ -223,6 +214,25 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\
}
int UDPStandardImplementation::setNumberofSamples(const uint64_t i) {
if (numberOfSamples != i) {
numberOfSamples = i;
//side effects
uint32_t ppf = ceil(double(2 * (nroichannels ? nroichannels : DEFAULT_NROI_CHANNELS) * numberOfSamples) / double(generalData->dataSize));
generalData->SetPacketsPerFrame(ppf);
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG (logINFO) << "Number of Samples: " << numberOfSamples;
FILE_LOG (logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
return OK;
}
int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
if (dynamicRange != i) {
dynamicRange = i;

View File

@ -281,7 +281,8 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT";
case F_SEND_RECEIVER_DETPOSID: return "F_SEND_RECEIVER_DETPOSID";
case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE";
case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT";
case F_RECEIVER_STREAMING_SRC_IP: return "F_RECEIVER_STREAMING_SRC_IP";
default: return "Unknown Function";
}
}
@ -328,6 +329,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_SEND_RECEIVER_DETPOSID] = &slsReceiverTCPIPInterface::set_detector_posid;
flist[F_SEND_RECEIVER_MULTIDETSIZE] = &slsReceiverTCPIPInterface::set_multi_detector_size;
flist[F_SET_RECEIVER_STREAMING_PORT] = &slsReceiverTCPIPInterface::set_streaming_port;
flist[F_RECEIVER_STREAMING_SRC_IP] = &slsReceiverTCPIPInterface::set_streaming_source_ip;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
FILE_LOG(logINFO) << "function fnum: " << i << " (" << getFunctionName((enum recFuncs)i) << ") located at " << (unsigned int)flist[i];
@ -681,6 +683,15 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
mySock->SendDataOnly(&ind,sizeof(ind));
// streaming source ip
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
path = receiverBase->getStreamingSourceIP();
#endif
mySock->SendDataOnly(path,MAX_STR_LENGTH);
if (path != NULL)
delete[] path;
if (!lockStatus)
strcpy(mySock->lastClientIP,mySock->thisClientIP);
@ -997,6 +1008,15 @@ int slsReceiverTCPIPInterface::set_timer() {
case SUBFRAME_ACQUISITION_TIME:
receiverBase->setSubExpTime(index[1]);
break;
case SAMPLES_JCTB:
if (myDetectorType != JUNGFRAUCTB) {
ret = FAIL;
sprintf(mess,"This timer mode (%lld) does not exist for this receiver type\n", (long long int)index[0]);
FILE_LOG(logERROR) << "Warning: " << mess;
break;
}
receiverBase->setNumberofSamples(index[1]);
break;
default:
ret = FAIL;
sprintf(mess,"This timer mode (%lld) does not exist for receiver\n", (long long int)index[0]);
@ -1019,6 +1039,15 @@ int slsReceiverTCPIPInterface::set_timer() {
case SUBFRAME_ACQUISITION_TIME:
retval=receiverBase->getSubExpTime();
break;
case SAMPLES_JCTB:
if (myDetectorType != JUNGFRAUCTB) {
ret = FAIL;
sprintf(mess,"This timer mode (%lld) does not exist for this receiver type\n", (long long int)index[0]);
FILE_LOG(logERROR) << "Warning: " << mess;
break;
}
retval=receiverBase->getNumberofSamples();
break;
default:
ret = FAIL;
sprintf(mess,"This timer mode (%lld) does not exist for receiver\n", (long long int)index[0]);
@ -1404,7 +1433,8 @@ int slsReceiverTCPIPInterface::set_file_dir() {
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "file path:" << retval;
if (retval != NULL)
FILE_LOG(logDEBUG1) << "file path:" << retval;
#endif
if (ret == OK && mySock->differentClients)
@ -2366,3 +2396,54 @@ int slsReceiverTCPIPInterface::set_streaming_port() {
// return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::set_streaming_source_ip() {
ret = OK;
memset(mess, 0, sizeof(mess));
char arg[MAX_STR_LENGTH];
memset(arg, 0, sizeof(arg));
char* retval=NULL;
// receive arguments
if (mySock->ReceiveDataOnly(arg,MAX_STR_LENGTH) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setStreamingSourceIP(arg);
}
//get
retval = receiverBase->getStreamingSourceIP();
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "streaming source ip:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(retval,MAX_STR_LENGTH);
delete[] retval;
// return ok/fail
return ret;
}