mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-13 05:17:13 +02:00
bug fixed, rapidjson parse error
This commit is contained in:
@ -1053,6 +1053,7 @@ void UDPStandardImplementation::stopReceiver(){
|
|||||||
}
|
}
|
||||||
|
|
||||||
//semaphore destroy
|
//semaphore destroy
|
||||||
|
cout<<"gonna destroy writerguisemphore"<<endl;
|
||||||
for(int i=0; i < numberofWriterThreads; i++){
|
for(int i=0; i < numberofWriterThreads; i++){
|
||||||
sem_destroy(&writerGuiSemaphore[i]);
|
sem_destroy(&writerGuiSemaphore[i]);
|
||||||
sem_destroy(&dataCallbackWriterSemaphore[i]);
|
sem_destroy(&dataCallbackWriterSemaphore[i]);
|
||||||
@ -1665,18 +1666,19 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data call back thread started %d\n",currentThreadIndex);
|
void UDPStandardImplementation::startDataCallback(){
|
||||||
|
cprintf(MAGENTA,"start data call back thread started %d\n",currentThreadIndex);
|
||||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||||
|
|
||||||
//set current thread value index
|
//set current thread value index
|
||||||
int ithread = currentThreadIndex;
|
int ithread = currentThreadIndex;
|
||||||
struct timespec begin,end;
|
struct timespec begin,end;
|
||||||
|
|
||||||
// server address to bind
|
// server address to bind
|
||||||
char hostName[100] = "tcp://127.0.0.1:";
|
char hostName[100] = "tcp://127.0.0.1:";
|
||||||
int portno = DEFAULT_ZMQ_PORTNO + (detID*2+ithread);
|
int portno = DEFAULT_ZMQ_PORTNO + (detID*2+ithread);
|
||||||
sprintf(hostName,"%s%d",hostName,portno);
|
sprintf(hostName,"%s%d",hostName,portno);
|
||||||
FILE_LOG(logINFO) << "Thread" << ithread << ": ZMQ Server at " << hostName;
|
FILE_LOG(logINFO) << "Thread" << ithread << ": ZMQ Server at " << hostName;
|
||||||
|
|
||||||
/* outer loop - loops once for each acquisition */
|
/* outer loop - loops once for each acquisition */
|
||||||
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
|
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
|
||||||
@ -1702,8 +1704,8 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
if(!zmqThreadStarted)
|
if(!zmqThreadStarted)
|
||||||
zmqThreadStarted = true;
|
zmqThreadStarted = true;
|
||||||
|
|
||||||
const char *type = "float64";
|
const char *type = "float64";
|
||||||
const char *shape= "[1024, 512]";
|
const char *shape= "[1024, 512]";
|
||||||
|
|
||||||
/* inner loop - loop for each buffer */
|
/* inner loop - loop for each buffer */
|
||||||
//until mask reset (dummy pcaket got by writer)
|
//until mask reset (dummy pcaket got by writer)
|
||||||
@ -1711,6 +1713,7 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
|
|
||||||
//let the writer thread continue, while we process carry over if any
|
//let the writer thread continue, while we process carry over if any
|
||||||
sem_post(&writerGuiSemaphore[ithread]);
|
sem_post(&writerGuiSemaphore[ithread]);
|
||||||
|
if(!ithread) cout<<"*** posted writerguisemiphore (callback)"<<endl;
|
||||||
//wait for receiver to send more data
|
//wait for receiver to send more data
|
||||||
sem_wait(&dataCallbackWriterSemaphore[ithread]);
|
sem_wait(&dataCallbackWriterSemaphore[ithread]);
|
||||||
|
|
||||||
@ -1718,25 +1721,29 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
if(guiNumPackets[ithread] == dummyPacketValue){
|
if(guiNumPackets[ithread] == dummyPacketValue){
|
||||||
|
|
||||||
|
|
||||||
/**suing this in clientzmq_msg_more,
|
/**suing this in clientzmq_msg_more,
|
||||||
* in serve use zmq_msg_send (&message, sender, ZMQ_SNDMORE); and 0 for last packet, but better to check lengt*/
|
* in serve use zmq_msg_send (&message, sender, ZMQ_SNDMORE); and 0 for last packet, but better to check lengt*/
|
||||||
/*if (checkJoinThread()){for different scans
|
/*if (checkJoinThread()){for different scans
|
||||||
break;
|
break;
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
ostringstream header;
|
ostringstream header;
|
||||||
header << "{\"htype\":[\"chunk-1.0\"], "
|
header << "{\"htype\":[\"chunk-1.0\"], "
|
||||||
<< "\"type\":" << "\"" << type << "\", "
|
<< "\"type\":" << "\"" << type << "\", "
|
||||||
<< "\"shape\":" << shape
|
<< "\"shape\":" << shape
|
||||||
<< "}";
|
<< "}";
|
||||||
//send header
|
//cout<<ithread << "header:"<< header.str()<<endl;
|
||||||
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
//send header
|
||||||
sleep(1);
|
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
||||||
//send data
|
//send data
|
||||||
zmq_send (zmqsocket, "end", 3, 0);
|
zmq_send (zmqsocket, "end", 3, 0);
|
||||||
|
|
||||||
pthread_mutex_lock(&statusMutex);
|
pthread_mutex_lock(&statusMutex);
|
||||||
dataCallbackThreadsMask^=(1<<ithread);
|
dataCallbackThreadsMask^=(1<<ithread);
|
||||||
pthread_mutex_unlock(&statusMutex);
|
pthread_mutex_unlock(&statusMutex);
|
||||||
|
|
||||||
|
sem_post(&writerGuiSemaphore[ithread]);
|
||||||
|
if(!ithread) cout<<"*** posted writerguisemiphore (callback dummy)"<<endl;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1780,14 +1787,14 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
|
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
|
||||||
offset+= onePacketSize;
|
offset+= onePacketSize;
|
||||||
ostringstream header;
|
ostringstream header;
|
||||||
header << "{\"htype\":[\"chunk-1.0\"], "
|
header << "{\"htype\":[\"chunk-1.0\"], "
|
||||||
<< "\"type\":" << "\"" << type << "\", "
|
<< "\"type\":" << "\"" << type << "\", "
|
||||||
<< "\"shape\":" << shape
|
<< "\"shape\":" << shape
|
||||||
<< "}";
|
<< "}";
|
||||||
//send header
|
//cout<<ithread << "header:"<< header.str()<<endl;
|
||||||
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
//send header
|
||||||
sleep(1);
|
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
||||||
//send data
|
//send data
|
||||||
zmq_send(zmqsocket, buffer, oneframesize, 0);
|
zmq_send(zmqsocket, buffer, oneframesize, 0);
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
cprintf(BLUE,"%d sent (last packet)\n",ithread);
|
cprintf(BLUE,"%d sent (last packet)\n",ithread);
|
||||||
@ -1805,17 +1812,17 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
//next frame
|
//next frame
|
||||||
if(fnum > currentfnum){
|
if(fnum > currentfnum){
|
||||||
ostringstream header;
|
ostringstream header;
|
||||||
header << "{\"htype\":[\"chunk-1.0\"], "
|
header << "{\"htype\":[\"chunk-1.0\"], "
|
||||||
<< "\"type\":" << "\"" << type << "\", "
|
<< "\"type\":" << "\"" << type << "\", "
|
||||||
<< "\"shape\":" << shape
|
<< "\"shape\":" << shape
|
||||||
<< "}";
|
<< "}";
|
||||||
//send header
|
//cout<<ithread << "header:"<< header.str()<<endl;
|
||||||
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
//send header
|
||||||
sleep(1);
|
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
|
||||||
//send data
|
//send data
|
||||||
zmq_send(zmqsocket, buffer, oneframesize, 0);
|
zmq_send(zmqsocket, buffer, oneframesize, 0);
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
cprintf(BLUE,"%d sent (last packet)\n",ithread);
|
cprintf(BLUE,"%d sent (last packet)\n",ithread);
|
||||||
#endif
|
#endif
|
||||||
//start clock after sending
|
//start clock after sending
|
||||||
if(!frameToGuiFrequency){
|
if(!frameToGuiFrequency){
|
||||||
@ -1836,18 +1843,17 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
|
|||||||
|
|
||||||
}/*--end of loop for each buffer (inner loop)*/
|
}/*--end of loop for each buffer (inner loop)*/
|
||||||
|
|
||||||
//free resources
|
|
||||||
usleep(1000*1000);
|
|
||||||
delete[] buffer;
|
|
||||||
zmq_unbind(zmqsocket, hostName);
|
|
||||||
zmq_close(zmqsocket);
|
|
||||||
zmq_ctx_destroy(context);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//end of acquisition, wait for next acquisition/change of parameters
|
//end of acquisition, wait for next acquisition/change of parameters
|
||||||
sem_wait(&dataCallbackSemaphore[ithread]);
|
sem_wait(&dataCallbackSemaphore[ithread]);
|
||||||
|
|
||||||
|
//free resources (only at the next start so that socket not closed before client gets end of packet)
|
||||||
|
delete[] buffer;
|
||||||
|
zmq_unbind(zmqsocket, hostName);
|
||||||
|
zmq_close(zmqsocket);
|
||||||
|
zmq_ctx_destroy(context);
|
||||||
|
|
||||||
|
|
||||||
//check to exit thread (for change of parameters) - only EXIT possibility
|
//check to exit thread (for change of parameters) - only EXIT possibility
|
||||||
if(killAllDataCallbackThreads){
|
if(killAllDataCallbackThreads){
|
||||||
@ -2498,7 +2504,9 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
|
|||||||
|
|
||||||
if(dataStreamEnable){
|
if(dataStreamEnable){
|
||||||
//ensure previous frame was processed
|
//ensure previous frame was processed
|
||||||
|
if(!ithread) cout<<"*** waiting for writerguisemiphore (stopwriting)"<<endl;
|
||||||
sem_wait(&writerGuiSemaphore[ithread]);
|
sem_wait(&writerGuiSemaphore[ithread]);
|
||||||
|
if(!ithread) cout<<"*** got post for writerguisemiphore (stopwriting)"<<endl;
|
||||||
guiNumPackets[ithread] = dummyPacketValue;
|
guiNumPackets[ithread] = dummyPacketValue;
|
||||||
//let it know its got data
|
//let it know its got data
|
||||||
sem_post(&dataCallbackWriterSemaphore[ithread]);
|
sem_post(&dataCallbackWriterSemaphore[ithread]);
|
||||||
@ -2801,8 +2809,9 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
|
|||||||
cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n");
|
cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n");
|
||||||
#endif
|
#endif
|
||||||
//ensure previous frame was processed
|
//ensure previous frame was processed
|
||||||
|
if(!ithread) cout<<"*** waiting for writerguisemiphore (copyfrmae)"<<endl;
|
||||||
sem_wait(&writerGuiSemaphore[ithread]);
|
sem_wait(&writerGuiSemaphore[ithread]);
|
||||||
|
if(!ithread) cout<<"*** got post for writerguisemiphore (copyframe)"<<endl;
|
||||||
//copy date
|
//copy date
|
||||||
guiNumPackets[ithread] = numpackets;
|
guiNumPackets[ithread] = numpackets;
|
||||||
strcpy(guiFileName[ithread],completeFileName[ithread]);
|
strcpy(guiFileName[ithread],completeFileName[ithread]);
|
||||||
|
Reference in New Issue
Block a user