crazy amount of changes, both necessary and unnecessary;need to narrow down the real change later

This commit is contained in:
Dhanya Maliakal 2017-04-27 14:05:04 +02:00
parent b6b0df62b6
commit b9275646ad
3 changed files with 256 additions and 172 deletions

View File

@ -100,7 +100,6 @@ enum communicationProtocol{
genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE) :
// portno(port_number),
protocol(p),
is_a_server(0),
socketDescriptor(-1),
@ -109,11 +108,15 @@ enum communicationProtocol{
nsending(0),
nsent(0),
total_sent(0),// sender (client): where to? ip
header_packet_size(0)
header_packet_size(0),
portno(port_number)
{
memset(&serverAddress, 0,sizeof(serverAddress));
memset(&clientAddress,0,sizeof(clientAddress));
// strcpy(hostname,host_ip_or_name);
memset(lastClientIP, 0, INET_ADDRSTRLEN);
memset(thisClientIP, 0, INET_ADDRSTRLEN);
memset(dummyClientIP, 0, INET_ADDRSTRLEN);
strcpy(lastClientIP,"none");
strcpy(thisClientIP,"none1");
@ -164,7 +167,6 @@ enum communicationProtocol{
*/
genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL, int hsize=0):
//portno(port_number),
protocol(p),
is_a_server(1),
socketDescriptor(-1),
@ -173,7 +175,8 @@ enum communicationProtocol{
nsending(0),
nsent(0),
total_sent(0),
header_packet_size(hsize)
header_packet_size(hsize),
portno(port_number)
{
memset(&serverAddress, 0, sizeof(serverAddress));
@ -181,7 +184,9 @@ enum communicationProtocol{
/* // you can specify an IP address: */
/* // or you can let it automatically select one: */
/* myaddr.sin_addr.s_addr = INADDR_ANY; */
memset(lastClientIP, 0, INET_ADDRSTRLEN);
memset(thisClientIP, 0, INET_ADDRSTRLEN);
memset(dummyClientIP, 0, INET_ADDRSTRLEN);
strcpy(lastClientIP,"none");
strcpy(thisClientIP,"none1");
@ -610,6 +615,7 @@ enum communicationProtocol{
break;
case UDP:
if (socketDescriptor<0) return -1;
//if length given, listens to length, else listens for packetsize till length is reached
if(length){
@ -632,12 +638,30 @@ enum communicationProtocol{
else{
//normal
nsending=packet_size;
/* if(portno%2){
cprintf(BLUE,"%d total_sent set to zero:%d\n",portno, total_sent);fflush(stdout);
}else{
cprintf(GREEN,"%d total_sent set to zero:%d\n",portno, total_sent);fflush(stdout);
}*/
while(1){
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(nsent<=0 || nsent == packet_size)
/* if(portno%2){
cprintf(BLUE,"%d nsent:%d total_sent:%d\n", portno, nsent, total_sent);fflush(stdout);
}else{
cprintf(GREEN,"%d nsent:%d total_sent:%d\n", portno, nsent, total_sent);fflush(stdout);
}*/
if((nsent<=0) || (nsent == packet_size)) {
/* if(portno%2){
cprintf(BLUE,"%d breaking out of loop %d\n",portno, nsent);fflush(stdout);
}else{
cprintf(GREEN,"%d breaking out of loop %d\n",portno, nsent);fflush(stdout);
}*/
break;
if(nsent != packet_size && nsent != header_packet_size)
cprintf(RED,"Incomplete Packet size %d\n",nsent);
}
if(nsent != packet_size && nsent != header_packet_size){
cprintf(RED,"%d Incomplete Packet size %d\n",portno, nsent);fflush(stdout);
}
}
//nsent = 1040;
total_sent+=nsent;
@ -649,8 +673,13 @@ enum communicationProtocol{
#ifdef VERY_VERBOSE
cout << "sent "<< total_sent << " Bytes" << endl;
#endif
/*if(protocol == UDP){
if(portno%2){
cprintf(BLUE,"%d exiting total sent %d\n",portno, total_sent);fflush(stdout);
}else{
cprintf(GREEN,"%d exiting total sent %d\n",portno, total_sent);fflush(stdout);
}
}*/
return total_sent;
@ -723,11 +752,11 @@ enum communicationProtocol{
private:
int nsending;
int nsent;
int total_sent;
volatile int nsending;
volatile int nsent;
volatile int total_sent;
int header_packet_size;
const int portno;
// pthread_mutex_t mp;

View File

@ -35,7 +35,7 @@ void UDPBaseImplementation::initializeMembers(){
//**detector parameters***
myDetectorType = GENERIC;
strcpy(detHostname,"");
memset(detHostname,0,MAX_STR_LENGTH);
packetsPerFrame = 0;
acquisitionPeriod = 0;
acquisitionTime = 0;
@ -51,14 +51,15 @@ void UDPBaseImplementation::initializeMembers(){
activated = true;
//***connection parameters***
strcpy(eth,"");
memset(eth,0,MAX_STR_LENGTH);
for(int i=0;i<MAX_NUMBER_OF_LISTENING_THREADS;i++){
udpPortNum[i] = DEFAULT_UDP_PORTNO + i;
}
//***file parameters***
strcpy(fileName,"run");
strcpy(filePath,"");
memset(fileName,0,MAX_STR_LENGTH);
snprintf(fileName,MAX_STR_LENGTH,"run");
memset(filePath,0,MAX_STR_LENGTH);
fileIndex = 0;
scanTag = 0;
frameIndexEnable = false;
@ -96,7 +97,8 @@ char *UDPBaseImplementation::getDetectorHostname() const{
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,detHostname);
memset(output,0,MAX_STR_LENGTH);
snprintf(output,MAX_STR_LENGTH,detHostname);
//freed by calling function
return output;
}
@ -115,7 +117,8 @@ char *UDPBaseImplementation::getFileName() const{
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,fileName);
memset(output,0,MAX_STR_LENGTH);
snprintf(output,MAX_STR_LENGTH,fileName);
//freed by calling function
return output;
}
@ -127,7 +130,8 @@ char *UDPBaseImplementation::getFilePath() const{
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,filePath);
memset(output,0,MAX_STR_LENGTH);
snprintf(output,MAX_STR_LENGTH,filePath);
//freed by calling function
return output;
}
@ -166,7 +170,8 @@ char *UDPBaseImplementation::getEthernetInterface() const{
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,eth);
memset(output,0,MAX_STR_LENGTH);
snprintf(output,MAX_STR_LENGTH,eth);
//freed by calling function
return output;
}
@ -215,7 +220,7 @@ void UDPBaseImplementation::setFlippedData(int axis, int enable){
flippedData[axis] = enable==0?0:1;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Flipped Data: %d , %d ", flippedData[0], flippedData[1]);
snprintf(cstreambuf, MAX_STR_LENGTH, "Flipped Data: %d , %d ", flippedData[0], flippedData[1]);
FILE_LOG(logINFO, cstreambuf);
}
@ -223,11 +228,13 @@ void UDPBaseImplementation::setFlippedData(int axis, int enable){
/***file parameters***/
void UDPBaseImplementation::setFileName(const char c[]){
if(strlen(c))
strcpy(fileName, c);
if(strlen(c)){
memset(fileName,0,MAX_STR_LENGTH);
snprintf(fileName,MAX_STR_LENGTH,c);
}
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "File name: %s ", fileName);
snprintf(cstreambuf, MAX_STR_LENGTH, "File name: %s ", fileName);
FILE_LOG(logINFO, cstreambuf);
}
@ -236,20 +243,20 @@ void UDPBaseImplementation::setFilePath(const char c[]){
if(strlen(c)){
//check if filepath exists
struct stat st;
if(stat(c,&st) == 0)
strcpy(filePath,c);
else{
strcpy(filePath,"");
if(stat(c,&st) == 0){
memset(filePath,0,MAX_STR_LENGTH);
snprintf(filePath,MAX_STR_LENGTH,c);
}else{
memset(filePath,0,MAX_STR_LENGTH);
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "FilePath does not exist: %s ", filePath);
snprintf(cstreambuf, MAX_STR_LENGTH, "FilePath does not exist: %s ", filePath);
FILE_LOG(logWARNING, cstreambuf);
}
strcpy(filePath, c);
}
/*{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "File path:: %s ", filePath);
snprintf(cstreambuf, MAX_STR_LENGTH, "File path:: %s ", filePath);
FILE_LOG(logDEBUG, cstreambuf);
}*/
}
@ -258,7 +265,7 @@ void UDPBaseImplementation::setFileIndex(const uint64_t i){
fileIndex = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "File Index: %lu ", fileIndex);
snprintf(cstreambuf, MAX_STR_LENGTH, "File Index: %lu ", fileIndex);
FILE_LOG(logINFO, cstreambuf);
}
@ -267,7 +274,7 @@ void UDPBaseImplementation::setScanTag(const int i){
scanTag = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Scan Tag: %d ", scanTag);
snprintf(cstreambuf, MAX_STR_LENGTH, "Scan Tag: %d ", scanTag);
FILE_LOG(logINFO, cstreambuf);
}
@ -275,7 +282,7 @@ void UDPBaseImplementation::setFrameIndexEnable(const bool b){
frameIndexEnable = b;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Frame Index Enable: %s ", stringEnable(frameIndexEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Frame Index Enable: %s ", stringEnable(frameIndexEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -283,7 +290,7 @@ void UDPBaseImplementation::setFileWriteEnable(const bool b){
fileWriteEnable = b;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "File Write Enable: %s ", stringEnable(fileWriteEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "File Write Enable: %s ", stringEnable(fileWriteEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -291,7 +298,7 @@ void UDPBaseImplementation::setOverwriteEnable(const bool b){
overwriteEnable = b;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Overwrite Enable: %s ", stringEnable(overwriteEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Overwrite Enable: %s ", stringEnable(overwriteEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -299,7 +306,7 @@ int UDPBaseImplementation::setDataCompressionEnable(const bool b){
dataCompressionEnable = b;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Data Compression: %s ", stringEnable(dataCompressionEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Data Compression: %s ", stringEnable(dataCompressionEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
//overridden methods might return FAIL
@ -312,7 +319,7 @@ void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){
udpPortNum[0] = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "UDP Port Number[0]: %u ", udpPortNum[0]);
snprintf(cstreambuf, MAX_STR_LENGTH, "UDP Port Number[0]: %u ", udpPortNum[0]);
FILE_LOG(logINFO, cstreambuf);
}
@ -320,15 +327,15 @@ void UDPBaseImplementation::setUDPPortNumber2(const uint32_t i){
udpPortNum[1] = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "UDP Port Number[1]: %u ", udpPortNum[1]);
snprintf(cstreambuf, MAX_STR_LENGTH, "UDP Port Number[1]: %u ", udpPortNum[1]);
FILE_LOG(logINFO, cstreambuf);
}
void UDPBaseImplementation::setEthernetInterface(const char* c){
strcpy(eth, c);
memset(eth,0, MAX_STR_LENGTH);
snprintf(eth,MAX_STR_LENGTH, c);
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Ethernet Interface: %s ", eth);
snprintf(cstreambuf, MAX_STR_LENGTH, "Ethernet Interface: %s ", eth);
FILE_LOG(logINFO, cstreambuf);
}
@ -338,7 +345,7 @@ void UDPBaseImplementation::setShortFrameEnable(const int i){
shortFrameEnable = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Short Frame Enable: %d ", shortFrameEnable);
snprintf(cstreambuf, MAX_STR_LENGTH, "Short Frame Enable: %d ", shortFrameEnable);
FILE_LOG(logINFO, cstreambuf);
}
@ -346,7 +353,7 @@ int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t freq){
frameToGuiFrequency = freq;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Frame To Gui Frequency: %u ", frameToGuiFrequency);
snprintf(cstreambuf, MAX_STR_LENGTH, "Frame To Gui Frequency: %u ", frameToGuiFrequency);
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -357,7 +364,7 @@ void UDPBaseImplementation::setFrameToGuiTimer(const uint32_t time_in_ms){
frameToGuiTimerinMS = time_in_ms;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Frame To Gui Timer: %u ", frameToGuiTimerinMS);
snprintf(cstreambuf, MAX_STR_LENGTH, "Frame To Gui Timer: %u ", frameToGuiTimerinMS);
FILE_LOG(logINFO, cstreambuf);
}
@ -366,7 +373,7 @@ uint32_t UDPBaseImplementation::setDataStreamEnable(const uint32_t enable){
dataStreamEnable = enable;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Streaming Data from Receiver: %d ", dataStreamEnable);
snprintf(cstreambuf, MAX_STR_LENGTH, "Streaming Data from Receiver: %d ", dataStreamEnable);
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -378,7 +385,7 @@ int UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){
acquisitionPeriod = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Acquisition Period: %f s ", (double)acquisitionPeriod/(1E9));
snprintf(cstreambuf, MAX_STR_LENGTH, "Acquisition Period: %f s ", (double)acquisitionPeriod/(1E9));
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -389,7 +396,7 @@ int UDPBaseImplementation::setAcquisitionTime(const uint64_t i){
acquisitionTime = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Acquisition Time: %f s ", (double)acquisitionTime/(1E9));
snprintf(cstreambuf, MAX_STR_LENGTH, "Acquisition Time: %f s ", (double)acquisitionTime/(1E9));
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -400,7 +407,7 @@ int UDPBaseImplementation::setNumberOfFrames(const uint64_t i){
numberOfFrames = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Number of Frames: %lu ", numberOfFrames);
snprintf(cstreambuf, MAX_STR_LENGTH, "Number of Frames: %lu ", numberOfFrames);
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -411,7 +418,7 @@ int UDPBaseImplementation::setDynamicRange(const uint32_t i){
dynamicRange = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Dynamic Range: %u ", dynamicRange);
snprintf(cstreambuf, MAX_STR_LENGTH, "Dynamic Range: %u ", dynamicRange);
FILE_LOG(logINFO, cstreambuf);
//overrridden child classes might return FAIL
@ -422,7 +429,7 @@ int UDPBaseImplementation::setTenGigaEnable(const bool b){
tengigaEnable = b;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Ten Giga Enable: %s ", stringEnable(tengigaEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Ten Giga Enable: %s ", stringEnable(tengigaEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
//overridden functions might return FAIL
@ -433,7 +440,7 @@ int UDPBaseImplementation::setFifoDepth(const uint32_t i){
fifoDepth = i;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Fifo Depth: %u ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Fifo Depth: %u ", i);
FILE_LOG(logINFO, cstreambuf);
//overridden functions might return FAIL
@ -452,7 +459,7 @@ int UDPBaseImplementation::setDetectorType(const detectorType d){
myDetectorType = d;
//if eiger, set numberofListeningThreads = 2;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Detector Type: %s ", getDetectorType(d).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Detector Type: %s ", getDetectorType(d).c_str());
FILE_LOG(logINFO, cstreambuf);
return OK;
@ -460,10 +467,12 @@ int UDPBaseImplementation::setDetectorType(const detectorType d){
void UDPBaseImplementation::initialize(const char *c){
if(strlen(c))
strcpy(detHostname, c);
if(strlen(c)){
memset(detHostname,0,MAX_STR_LENGTH);
snprintf(detHostname, MAX_STR_LENGTH, c);
}
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Detector Hostname: %s ", detHostname);
snprintf(cstreambuf, MAX_STR_LENGTH, "Detector Hostname: %s ", detHostname);
FILE_LOG(logINFO, cstreambuf);
}
@ -475,7 +484,7 @@ void UDPBaseImplementation::resetAcquisitionCount(){
totalPacketsCaught = 0;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Total Packets Caught: %lu ", totalPacketsCaught);
snprintf(cstreambuf, MAX_STR_LENGTH, "Total Packets Caught: %lu ", totalPacketsCaught);
FILE_LOG(logINFO, cstreambuf);
}
@ -528,7 +537,7 @@ int UDPBaseImplementation::setActivate(int enable){
activated = enable;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Activation: %s ", stringEnable(activated).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Activation: %s ", stringEnable(activated).c_str());
FILE_LOG(logINFO, cstreambuf);
}

View File

@ -146,9 +146,9 @@ void UDPStandardImplementation::initializeMembers(){
}
#endif
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
strcpy(completeFileName[i],"");
strcpy(fileNamePerThread[i],"");
strcpy(fileHeader[i],"");
memset(completeFileName[i], 0, MAX_STR_LENGTH);
memset(fileNamePerThread[i], 0, MAX_STR_LENGTH);
memset(fileHeader[i], 0, FILE_HEADER_SIZE);
sfilefd[i] = 0;
}
maxFramesPerFile = 0;
@ -190,7 +190,7 @@ void UDPStandardImplementation::initializeMembers(){
//***receiver to GUI parameters***
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
latestData[i] = 0;
strcpy(guiFileName[i],"");
memset(guiFileName[i],0,MAX_STR_LENGTH);
guiNumPackets[i] = 0;
frametoGuiCounter[i] = 0;
}
@ -309,7 +309,7 @@ int UDPStandardImplementation::setupFifoStructure(){
}
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Number of Frames per buffer: %d ",numberofJobsPerBuffer);
snprintf(cstreambuf, MAX_STR_LENGTH, "Number of Frames per buffer: %d ",numberofJobsPerBuffer);
FILE_LOG(logINFO, cstreambuf);
}
}
@ -332,7 +332,7 @@ int UDPStandardImplementation::setupFifoStructure(){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Total Fifo Size: %u ",fifoSize);
snprintf(cstreambuf, MAX_STR_LENGTH, "Total Fifo Size: %u ",fifoSize);
FILE_LOG(logINFO, cstreambuf);
}
@ -426,10 +426,13 @@ void UDPStandardImplementation::setFileName(const char c[]){
char oldfilename[MAX_STR_LENGTH];
strcpy(oldfilename,fileName);
memset(oldfilename,0,MAX_STR_LENGTH);
snprintf(oldfilename,MAX_STR_LENGTH, fileName);
if(strlen(c))
strcpy(fileName, c);
if(strlen(c)){
memset(fileName,0,MAX_STR_LENGTH);
snprintf(fileName,MAX_STR_LENGTH, c);
}
if(strlen(fileName)){
int detindex = -1;
@ -445,7 +448,7 @@ void UDPStandardImplementation::setFileName(const char c[]){
}
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "File name: %s ",fileName);
snprintf(cstreambuf, MAX_STR_LENGTH, "File name: %s ",fileName);
FILE_LOG(logINFO, cstreambuf);
}
}
@ -488,7 +491,7 @@ int UDPStandardImplementation::setDataCompressionEnable(const bool b){
initializeFilter();
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Data Compression: %s ",stringEnable(dataCompressionEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Data Compression: %s ",stringEnable(dataCompressionEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
return OK;
@ -539,7 +542,7 @@ void UDPStandardImplementation::setShortFrameEnable(const int i){
initializeFilter();
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Short Frame Enable: %d ", shortFrameEnable);
snprintf(cstreambuf, MAX_STR_LENGTH, "Short Frame Enable: %d ", shortFrameEnable);
FILE_LOG(logINFO, cstreambuf);
}
}
@ -553,7 +556,7 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){
return FAIL;
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Frame to Gui Frequency: %u ",frameToGuiFrequency);
snprintf(cstreambuf, MAX_STR_LENGTH, "Frame to Gui Frequency: %u ",frameToGuiFrequency);
FILE_LOG(logINFO, cstreambuf);
}
return OK;
@ -568,7 +571,7 @@ uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){
dataStreamEnable = enable;
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Data Send to Gui: %d ", dataStreamEnable);
snprintf(cstreambuf, MAX_STR_LENGTH, "Data Send to Gui: %d ", dataStreamEnable);
FILE_LOG(logINFO, cstreambuf);
}
@ -599,7 +602,7 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Acquisition Period: %f s", (double)acquisitionPeriod/(1E9));
snprintf(cstreambuf, MAX_STR_LENGTH, "Acquisition Period: %f s", (double)acquisitionPeriod/(1E9));
FILE_LOG(logINFO, cstreambuf);
}
@ -620,7 +623,7 @@ int UDPStandardImplementation::setAcquisitionTime(const uint64_t i){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Acquisition Period: %f s ", (double)acquisitionTime/(1E9));
snprintf(cstreambuf, MAX_STR_LENGTH, "Acquisition Period: %f s ", (double)acquisitionTime/(1E9));
FILE_LOG(logINFO, cstreambuf);
}
@ -641,7 +644,7 @@ int UDPStandardImplementation::setNumberOfFrames(const uint64_t i){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Number of Frames: %lu ", numberOfFrames);
snprintf(cstreambuf, MAX_STR_LENGTH, "Number of Frames: %lu ", numberOfFrames);
FILE_LOG(logINFO, cstreambuf);
}
@ -659,7 +662,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Setting Dynamic Range to %u ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Setting Dynamic Range to %u ", i);
FILE_LOG(logDEBUG1, cstreambuf);
}
@ -683,6 +686,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
}
for(int i=0;i<numberofWriterThreads;i++){
latestData[i] = new char[bufferSize+sizeof(sls_detector_header)]();
memset(latestData[i], 0, bufferSize+sizeof(sls_detector_header));
}
//restructure fifo
numberofJobsPerBuffer = -1;
@ -695,7 +699,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Dynamic Range: %u ", dynamicRange);
snprintf(cstreambuf, MAX_STR_LENGTH, "Dynamic Range: %u ", dynamicRange);
FILE_LOG(logINFO, cstreambuf);
}
return OK;
@ -707,7 +711,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Setting Ten Giga to %s ", stringEnable(b).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Setting Ten Giga to %s ", stringEnable(b).c_str());
FILE_LOG(logDEBUG1, cstreambuf);
}
@ -731,7 +735,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "packetsPerFrame: %u\n"
snprintf(cstreambuf, MAX_STR_LENGTH, "packetsPerFrame: %u\n"
"onePacketSize: %d\n"
"oneDataSize: %d\n"
"bufferSize: %d ",
@ -751,6 +755,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
}
for(int i=0;i<numberofWriterThreads;i++){
latestData[i] = new char[bufferSize+sizeof(sls_detector_header)]();
memset(latestData[i], 0, bufferSize+sizeof(sls_detector_header));
}
//restructure fifo
@ -763,7 +768,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Ten Giga: %s ", stringEnable(tengigaEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Ten Giga: %s ", stringEnable(tengigaEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
return OK;
@ -776,7 +781,7 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i){
if(i != fifoDepth){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Fifo Depth: %u ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Fifo Depth: %u ", i);
FILE_LOG(logINFO, cstreambuf);
}
fifoDepth = i;
@ -814,11 +819,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
case EIGER:
case JUNGFRAUCTB:
case JUNGFRAU:
sprintf(cstreambuf, " ***** %s Receiver *** ", getDetectorType(d).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, " ***** %s Receiver *** ", getDetectorType(d).c_str());
FILE_LOG(logINFO, cstreambuf);
break;
default:
sprintf(cstreambuf, "This is an unknown receiver type %d ", (int)d);
snprintf(cstreambuf, MAX_STR_LENGTH, "This is an unknown receiver type %d ", (int)d);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
@ -913,7 +918,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
excludeMissingPackets=true;
break;
default:
sprintf(cstreambuf, "This is an unknown receiver type %d ", (int)d);
snprintf(cstreambuf, MAX_STR_LENGTH, "This is an unknown receiver type %d ", (int)d);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
@ -950,10 +955,14 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
if(latestData[i]) {delete[] latestData[i];latestData[i] = 0;}
}
for(int i=0; i<numberofWriterThreads; i++){
if(excludeMissingPackets)
if(excludeMissingPackets){
latestData[i] = new char[bufferSize+sizeof(sls_detector_header)]();
else
memset(latestData[i], 0, bufferSize+sizeof(sls_detector_header));
}
else{
latestData[i] = new char[bufferSize]();
memset(latestData[i], 0, bufferSize);
}
}
@ -964,7 +973,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
}
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Detector type set to %s ", getDetectorType(d).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Detector type set to %s ", getDetectorType(d).c_str());
FILE_LOG(logDEBUG1, cstreambuf);
}
return OK;
@ -1028,7 +1037,7 @@ int UDPStandardImplementation::startReceiver(char *c){
//reset gui variables
frametoGuiCounter[i] = 0;
guiNumPackets[i] = 0;
strcpy(guiFileName[i],"");
memset(guiFileName[i],0,MAX_STR_LENGTH);
}
pthread_mutex_lock(&writeMutex);
@ -1047,16 +1056,16 @@ int UDPStandardImplementation::startReceiver(char *c){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
if(myDetectorType != EIGER){
sprintf(cstreambuf, "Data Compression has been %s ", stringEnable(dataCompressionEnable).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Data Compression has been %s ", stringEnable(dataCompressionEnable).c_str());
FILE_LOG(logINFO, cstreambuf);
}
sprintf(cstreambuf, "Number of Jobs Per Buffer: %d\n"
snprintf(cstreambuf, MAX_STR_LENGTH, "Number of Jobs Per Buffer: %d\n"
"Max Frames Per File: %lu ",
numberofJobsPerBuffer, maxFramesPerFile);
FILE_LOG(logINFO, cstreambuf);
if(frameToGuiFrequency) {
sprintf(cstreambuf, "Frequency of frames sent to gui: %u ", frameToGuiFrequency);
snprintf(cstreambuf, MAX_STR_LENGTH, "Frequency of frames sent to gui: %u ", frameToGuiFrequency);
FILE_LOG(logINFO, cstreambuf);
} else
FILE_LOG(logINFO, "Frequency of frames sent to gui: Random ");
@ -1064,7 +1073,8 @@ int UDPStandardImplementation::startReceiver(char *c){
//create UDP sockets
if(createUDPSockets() == FAIL){
strcpy(c,"Could not create UDP Socket(s).");
memset(c, 0, MAX_STR_LENGTH);
snprintf(c,MAX_STR_LENGTH,"Could not create UDP Socket(s).");
FILE_LOG(logERROR,"Could not create UDP Socket(s). ");
return FAIL;
}
@ -1072,7 +1082,8 @@ int UDPStandardImplementation::startReceiver(char *c){
if(setupWriter() == FAIL){
//stop udp socket
shutDownUDPSockets();
strcpy(c,"Could not create file");
memset(c, 0, MAX_STR_LENGTH);
snprintf(c,MAX_STR_LENGTH,"Could not create file");
FILE_LOG(logERROR, "Could not create file ");
for(int i=0; i < numberofWriterThreads; i++)
@ -1081,8 +1092,9 @@ int UDPStandardImplementation::startReceiver(char *c){
}
//For compression, just for gui purposes
if(dataCompressionEnable)
sprintf(completeFileName[0], "%s/%s_fxxx_%lld_xx.root", filePath,fileNamePerThread[0],(long long int)fileIndex);
if(dataCompressionEnable){
memset(completeFileName[0],0,MAX_STR_LENGTH);
snprintf(completeFileName[0], MAX_STR_LENGTH, "%s/%s_fxxx_%lld_xx.root", filePath,fileNamePerThread[0],(long long int)fileIndex);}
//initialize semaphore to synchronize between writer and gui reader threads
for(int i=0;i<numberofWriterThreads;i++){
@ -1116,7 +1128,7 @@ int UDPStandardImplementation::startReceiver(char *c){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Receiver Started\n"
snprintf(cstreambuf, MAX_STR_LENGTH, "Receiver Started\n"
"Status: %s ", runStatusType(status).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -1153,7 +1165,7 @@ void UDPStandardImplementation::stopReceiver(){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Receiver Stopped\n"
snprintf(cstreambuf, MAX_STR_LENGTH, "Receiver Stopped\n"
"Status: %s ", runStatusType(status).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -1172,7 +1184,7 @@ int UDPStandardImplementation::shutDownUDPSockets(){
udpSocket[i]->ShutDownSocket();
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Shut down UDP Sock %d ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Shut down UDP Sock %d ", i);
FILE_LOG(logINFO, cstreambuf);
delete udpSocket[i];
@ -1267,7 +1279,7 @@ void UDPStandardImplementation::closeFile(int ithread){
if(!dataCompressionEnable){
if(sfilefd[ithread]){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Going to close file: %d ", fileno(sfilefd[ithread]));
snprintf(cstreambuf, MAX_STR_LENGTH, "Going to close file: %d ", fileno(sfilefd[ithread]));
FILE_LOG(logDEBUG4, cstreambuf);
fflush(sfilefd[ithread]);
@ -1281,7 +1293,7 @@ void UDPStandardImplementation::closeFile(int ithread){
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(sfilefd[0]){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "sfilefd: %d ", fileno(sfilefd[0]));
snprintf(cstreambuf, MAX_STR_LENGTH, "sfilefd: %d ", fileno(sfilefd[0]));
FILE_LOG(logDEBUG4, cstreambuf);
fclose(sfilefd[0]);
@ -1324,7 +1336,7 @@ int UDPStandardImplementation::setActivate(int enable){
if(enable != -1){
activated = enable;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Activation: %s ", stringEnable(activated).c_str());
snprintf(cstreambuf, MAX_STR_LENGTH, "Activation: %s ", stringEnable(activated).c_str());
FILE_LOG(logINFO, cstreambuf);
}
@ -1382,7 +1394,7 @@ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){
currentThreadIndex = i;
if(pthread_create(&dataCallbackThreads[i], NULL,startDataCallbackThread, (void*) this)){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create data call back thread with index %d ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create data call back thread with index %d ", i);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
@ -1448,7 +1460,7 @@ int UDPStandardImplementation::createListeningThreads(bool destroy){
currentThreadIndex = i;
if(pthread_create(&listeningThreads[i], NULL,startListeningThread, (void*) this)){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create listening thread with index %d ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create listening thread with index %d ", i);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
@ -1514,7 +1526,7 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){
currentThreadIndex = i;
if(pthread_create(&writingThreads[i], NULL,startWritingThread, (void*) this)){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create writer thread with index %d ", i);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create writer thread with index %d ", i);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
@ -1595,8 +1607,9 @@ int UDPStandardImplementation::createUDPSockets(){
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL)
strcpy(eth,"");
if (strchr(eth,'.') != NULL){
memset(eth,0,MAX_STR_LENGTH);
}
shutDownUDPSockets();
int headerpacketsize = 0;
@ -1614,7 +1627,7 @@ int UDPStandardImplementation::createUDPSockets(){
else{
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Ethernet Interface: %s", eth);
snprintf(cstreambuf, MAX_STR_LENGTH, "Ethernet Interface: %s", eth);
FILE_LOG(logINFO, cstreambuf);
}
for(int i=0;i<numberofListeningThreads;i++)
@ -1628,7 +1641,7 @@ int UDPStandardImplementation::createUDPSockets(){
cout << "UDP port opened at port " << port[i] << endl;
}else{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create UDP socket on port %u error: %d ", port[i], iret);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create UDP socket on port %u error: %d ", port[i], iret);
FILE_LOG(logERROR, cstreambuf);
shutDownUDPSockets();
return FAIL;
@ -1677,7 +1690,7 @@ int UDPStandardImplementation::setupWriter(){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
for(int i=0; i<numberofWriterThreads; i++){
sprintf(cstreambuf, "%d Going to post 1st semaphore ",i);
snprintf(cstreambuf, MAX_STR_LENGTH, "%d Going to post 1st semaphore ",i);
FILE_LOG(logDEBUG4, cstreambuf);
sem_post(&writerSemaphore[i]);
@ -1710,14 +1723,15 @@ int UDPStandardImplementation::createNewFile(int ithread){
//create file name
memset(completeFileName[ithread],0,MAX_STR_LENGTH);
if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
snprintf(completeFileName[ithread], MAX_STR_LENGTH, "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
else
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
snprintf(completeFileName[ithread], MAX_STR_LENGTH, "%s/%s_f%012lld_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "%s ", completeFileName[ithread]);
snprintf(cstreambuf, MAX_STR_LENGTH, "%s ", completeFileName[ithread]);
FILE_LOG(logDEBUG4, cstreambuf);
}
//filewrite enable & we allowed to create/close files
@ -1740,7 +1754,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
if(!overwriteEnable){
if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create/overwrite file %s ", completeFileName[ithread]);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create/overwrite file %s ", completeFileName[ithread]);
FILE_LOG(logERROR, cstreambuf);
sfilefd[ithread] = 0;
@ -1748,7 +1762,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
}
}else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Could not create file %s ", completeFileName[ithread]);
snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create file %s ", completeFileName[ithread]);
FILE_LOG(logERROR, cstreambuf);
sfilefd[ithread] = 0;
@ -1820,13 +1834,14 @@ int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){
#ifdef MYROOT1
char temp[MAX_STR_LENGTH];
memset(temp,0,MAX_STR_LENGTH);
//create file name for gui purposes, and set up acquistion parameters
sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileNamePerThread[ithread],fileIndex,ithread);
snprintf(temp,MAX_STR_LENGTH, "%s/%s_fxxx_%d_%d.root", filePath,fileNamePerThread[ithread],fileIndex,ithread);
//file
myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */
cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp);
//tree
sprintf(temp, "%s_fxxx_%d_%d",fileNamePerThread[ithread],fileIndex,ithread);
snprintf(temp, MAX_STR_LENGTH,"%s_fxxx_%d_%d",fileNamePerThread[ithread],fileIndex,ithread);
myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe);
//resets the pedestalSubtraction array and the commonModeSubtraction
singlePhotonDetectorObject[ithread]->newDataSet();
@ -1878,9 +1893,10 @@ void UDPStandardImplementation::startDataCallback(){
struct timespec begin,end;
// server address to bind
char hostName[100] = "tcp://*:";//"tcp://127.0.0.1:";
char hostName[100];
memset(hostName,0,100);
int portno = DEFAULT_ZMQ_PORTNO + (detID*numberofListeningThreads+ithread);
sprintf(hostName,"%s%d",hostName,portno);
snprintf(hostName,100,"tcp://*:%d",portno);//"tcp://127.0.0.1:";
//socket details
void *context = zmq_ctx_new();
@ -1893,7 +1909,7 @@ void UDPStandardImplementation::startDataCallback(){
// bind
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Thread %d: ZMQ Server at %s ", ithread, hostName);
snprintf(cstreambuf, MAX_STR_LENGTH, "Thread %d: ZMQ Server at %s ", ithread, hostName);
FILE_LOG(logINFO, cstreambuf);
}
@ -1905,19 +1921,18 @@ void UDPStandardImplementation::startDataCallback(){
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
while(true){
int oneframesize = oneDataSize * packetsPerFrame;
char* buffer = new char[packetsPerFrame*oneDataSize]();
memset(buffer,0xFF,oneframesize);
bool randomSendNow = true;
//header details
const char *jsonFmt ="{"
"\"jsonversion\":%u, "
"\"acqIndex\":%llu, "
"\"fIndex\":%llu, "
"\"bitmode\":%d, "
"\"shape\":[%d, %d], "
"\"fname\":\"%s\", "
"\"jsonversion\":%u, "
"\"acqIndex\":%llu, "
"\"fIndex\":%llu, "
"\"bitmode\":%d, "
"\"shapex\": %d, "
"\"shapey\": %d, "
"\"fname\":\"%s\", "
"\"data\": %d, "
"\"frameNumber\":%llu, "
"\"expLength\":%u, "
@ -1932,15 +1947,14 @@ void UDPStandardImplementation::startDataCallback(){
"\"roundRNumber\":%u, "
"\"detType\":%u, "
"\"version\":%u"
"}\n";
"}\n\0";
int npixelsx=0, npixelsy=0;
switch(myDetectorType) {
case JUNGFRAU: npixelsx = JFRAU_PIXELS_IN_ONE_ROW; npixelsy = JFRAU_PIXELS_IN_ONE_COL; break;
case EIGER: npixelsx = EIGER_PIXELS_IN_ONE_ROW; npixelsy = EIGER_PIXELS_IN_ONE_COL; break;
default:break; /* will not work for other detectors*/
}
uint64_t acquisitionIndex = -1;
uint64_t frameIndex = -1;
#ifdef DEBUG
int oldpnum = -1;
#endif
@ -1963,19 +1977,22 @@ void UDPStandardImplementation::startDataCallback(){
cprintf(BLUE,"%d sending dummy\n");
#endif
frameIndex = -1;
acquisitionIndex = -1;
{
char buf[1000]="";memset(buf,0,1000);
sprintf(buf,jsonFmt,
SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread],
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,1000);
int len = snprintf(buf,MAX_STR_LENGTH, jsonFmt,
SLS_DETECTOR_JSON_HEADER_VERSION,0,0,0,0,0, "", 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
//cprintf(BLUE,"%d:Dummy:%s.\n",ithread, buf);fflush(stdout);
zmq_send_const(zmqsocket, buf, len, 0);//ZMQ_SNDMORE);
zmq_send(zmqsocket, buf,1000, 0);
}
cprintf(BLUE,"%d dummy\n",ithread);
/* {
//send final data
zmq_send (zmqsocket, "end\n", 4, 0);
char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,0);
int len = snprintf(buf,MAX_STR_LENGTH,"%s","end\n");
zmq_send_const(zmqsocket, buf,len, 0);
}*/
pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask^=(1<<ithread);
pthread_mutex_unlock(&statusMutex);
@ -2008,19 +2025,24 @@ void UDPStandardImplementation::startDataCallback(){
//update frame details
sls_detector_header* header = (sls_detector_header*) (latestData[ithread]);
uint64_t fnum = header->frameNumber;
frameIndex = fnum - startFrameIndex;
acquisitionIndex = fnum - startAcquisitionIndex;
{
char buf[1000]="";memset(buf,0,1000);
sprintf(buf,jsonFmt,
SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread],
header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp,
header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version);
uint64_t frameIndex = fnum - startFrameIndex;
uint64_t acquisitionIndex = fnum - startAcquisitionIndex;
zmq_send(zmqsocket, buf,1000, 0);
{
char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,1000);
int len = snprintf(buf,MAX_STR_LENGTH,jsonFmt,
SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread],1,
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp,
header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version
);
//cprintf(BLUE,"%d:%s.\n",ithread, buf);fflush(stdout);
zmq_send_const(zmqsocket, buf,len, ZMQ_SNDMORE);
}
//send data
zmq_send(zmqsocket, (latestData[ithread]+sizeof(sls_detector_header)), bufferSize, 0);
zmq_send_const(zmqsocket,(latestData[ithread]+sizeof(sls_detector_header)), bufferSize, 0);
//start clock after sending
if(!frameToGuiFrequency){
randomSendNow = false;
@ -2039,8 +2061,6 @@ void UDPStandardImplementation::startDataCallback(){
}/*--end of loop for each buffer (inner loop)*/
//free resources
delete[] buffer;
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&dataCallbackSemaphore[ithread]);
@ -2088,6 +2108,7 @@ void UDPStandardImplementation::startListening(){
carryonBufferSize = 0;
if(tempBuffer){delete []tempBuffer;tempBuffer=0;}
tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)](); //store maximum of 1 packets less in a frame
memset(tempBuffer,0,onePacketSize * (packetsPerFrame - 1));
}
/* inner loop - loop for each buffer */
@ -2102,7 +2123,7 @@ void UDPStandardImplementation::startListening(){
if(activated && !udpSocket[ithread]){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Listening_Thread %d: UDP Socket not created or shut down earlier ", ithread);
snprintf(cstreambuf, MAX_STR_LENGTH, "Listening_Thread %d: UDP Socket not created or shut down earlier ", ithread);
FILE_LOG(logERROR, cstreambuf);
}
stopListening(ithread,0);
@ -2249,6 +2270,11 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
//read first packet
pnum = FIRSTPNUM; //first packet number to validate
if(status != TRANSMITTING) rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset);
/*if(!ithread){
cprintf(BLUE,"%d first rc:%d\n",ithread, rc);fflush(stdout);
}else{
cprintf(GREEN,"%d first rc:%d\n",ithread, rc);fflush(stdout);
}*/
if(rc <= 0) return 0;
if(getFrameandPacketNumber(ithread,buffer[ithread] + offset,fi,pi,si,bi) == FAIL){
pi = ALL_MASK_32; //got 0 from fpga
@ -2276,7 +2302,7 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
if(!ithread) cout << "correct packet" << endl;
#endif
//copy only data
memcpy(buffer[ithread] + offset,buffer[ithread] + offset + headerlength, oneDataSize);
memmove(buffer[ithread] + offset,buffer[ithread] + offset + headerlength, oneDataSize);
offset+=oneDataSize;
//if complete frame
@ -2364,13 +2390,21 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread)
header->detType = (uint8_t) myDetectorType;
header->version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
#ifdef VERBOSE
if(!ithread)
//#ifdef VERBOSE
//if(!ithread)
/*if(!ithread) {
cprintf(BLUE,
"framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n",
header->frameNumber, header->expLength, header->packetNumber,
header->bunchId, header->xCoord, header->detType, header->version);
#endif
"%d framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n",
ithread, header->frameNumber, header->expLength, header->packetNumber,
header->bunchId, header->xCoord, header->detType, header->version);fflush(stdout);
}else{
cprintf(GREEN,
"%d framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n",
ithread, header->frameNumber, header->expLength, header->packetNumber,
header->bunchId, header->xCoord, header->detType, header->version);fflush(stdout);
}*/
//#endif
//write packet count to buffer
*((uint32_t*)(buffer[ithread])) = packetsPerFrame;
@ -2433,7 +2467,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
//free empty buffer
if(numbytes <= 0){
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Listening %d: End of Acquisition", ithread);
snprintf(cstreambuf, MAX_STR_LENGTH, "Listening %d: End of Acquisition", ithread);
FILE_LOG(logINFO, cstreambuf);
while(!fifoFree[ithread]->push(buffer[ithread]));
}
@ -2470,7 +2504,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
//#ifdef DEBUG4
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Listening Thread of %u got %d packets ", udpPortNum[ithread], totalListeningPacketCount[ithread]);
snprintf(cstreambuf, MAX_STR_LENGTH, "Listening Thread of %u got %d packets ", udpPortNum[ithread], totalListeningPacketCount[ithread]);
FILE_LOG(logINFO, cstreambuf);
}
//#endif
@ -2667,6 +2701,7 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
//create file
if((1<<ithread)&createFileMask){
memset(fileNamePerThread[ithread],0,MAX_STR_LENGTH);
//change the detector index in the file names
if(myDetectorType == EIGER){
int detindex = -1;
@ -2676,15 +2711,15 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
if (uscore!=string::npos){
if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"d%d",&detindex)) {
tempname=tempname.substr(0,uscore);
sprintf(fileNamePerThread[ithread],"%s_d%d",tempname.c_str(),detindex*2+ithread);
snprintf(fileNamePerThread[ithread],MAX_STR_LENGTH,"%s_d%d",tempname.c_str(),detindex*2+ithread);
}
}
//only one half module, so no detid
if(detindex == -1)
sprintf(fileNamePerThread[ithread],"%s_d%d",fileName,ithread);
snprintf(fileNamePerThread[ithread],MAX_STR_LENGTH,"%s_d%d",fileName,ithread);
}else
strcpy(fileNamePerThread[0],fileName);
snprintf(fileNamePerThread[0],MAX_STR_LENGTH,fileName);
if(dataCompressionEnable){
#ifdef MYROOT1
@ -2745,7 +2780,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
{
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Writing %d: End of Acquisition ", ithread);
snprintf(cstreambuf, MAX_STR_LENGTH, "Writing %d: End of Acquisition ", ithread);
FILE_LOG(logINFO, cstreambuf);
}
@ -2841,15 +2876,19 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//thread 0 waits for all threads to finish & print statistics
if(ithread == 0){
/* cprintf(BLUE,"Thread 0: Waiting for all writer threads\n");*/
//wait for all other threads
while(writerThreadsMask)
usleep(5000);
/*cprintf(BLUE,"Thread 0: Waiting for all listener threads\n");*/
//ensure listening threads done before updating status as it returns to client (from stopReceiver)
while(listeningThreadsMask)
usleep(5000);
/*cprintf(BLUE,"Thread 0: Waiting for all datacallback threads\n");*/
//ensure datacallbacks threads are done
while(dataCallbackThreadsMask)
usleep(5000);
/*cprintf(BLUE,"Thread 0: Done, waiting for lock to say finished\n");*/
//update status
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
@ -2898,6 +2937,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)), pAcquisitionFinished);
}
/*cprintf(BLUE,"Thread %d: Done\n", ithread);*/
}
@ -3207,7 +3247,8 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
void UDPStandardImplementation::updateFileHeader(int ithread){
//update file header
time_t t = time(0);
sprintf(fileHeader[ithread],
memset(fileHeader[ithread],0,FILE_HEADER_SIZE);
snprintf(fileHeader[ithread],FILE_HEADER_SIZE,
"\nHeader\t\t: %d bytes\n"
"Top\t\t: %d\n"
"Left\t\t: %d\n"
@ -3279,7 +3320,8 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
//copy date
guiNumPackets[ithread] = numpackets;
strcpy(guiFileName[ithread],completeFileName[ithread]);
memset(guiFileName[ithread],0,MAX_STR_LENGTH);
snprintf(guiFileName[ithread],MAX_STR_LENGTH,completeFileName[ithread]);
if(excludeMissingPackets) //copy also the header
memcpy(latestData[ithread],buffer+ HEADER_SIZE_NUM_TOT_PACKETS, bufferSize + sizeof(sls_detector_header));
@ -3464,9 +3506,13 @@ int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffe
framenumber = 0;
char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH);
sprintf(cstreambuf, "Fifo %d: Frame Number is zero from firmware. ", ithread);
snprintf(cstreambuf, MAX_STR_LENGTH, "Fifo %d: Frame Number is zero from firmware. ", ithread);
FILE_LOG(logERROR, cstreambuf);
e_header = (eiger_packet_header_t*) (wbuffer);
subframenumber = *( (uint32_t*) e_header->subFrameNumber);
snprintf(cstreambuf, MAX_STR_LENGTH, "Fifo %d: Frame Number is zero from firmware. subnum: %u, fnum:%u, pnum:%u ", ithread, subframenumber,
(uint32_t)(*( (uint64_t*) footer)),(*( (uint16_t*) footer->packetNumber))-1);
FILE_LOG(logERROR, cstreambuf);
return FAIL;
}
packetnumber = (*( (uint16_t*) footer->packetNumber))-1;