in between

This commit is contained in:
Dhanya Maliakal 2016-09-05 10:09:49 +02:00
parent 4eceb3b5f7
commit 5f6b4c1b79
2 changed files with 190 additions and 17 deletions

View File

@ -289,6 +289,12 @@ private:
/*************************************************************************
* Listening and Writing Threads *****************************************
*************************************************************************/
/**
* Create Data Call Back Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createDataCallbackThreads(bool destroy = false);
/**
* Create Listening Threads
@ -303,6 +309,9 @@ private:
*/
int createWriterThreads(bool destroy = false);
/**
* Set Thread Priorities
*/
@ -336,6 +345,12 @@ private:
*/
int createCompressionFile(int ithread, int iframe);
/**
* Static function - Starts Data Callback Thread of this object
* @param this_pointer pointer to this object
*/
static void* startDataCallbackThread(void *this_pointer);
/**
* Static function - Starts Listening Thread of this object
* @param this_pointer pointer to this object
@ -348,6 +363,11 @@ private:
*/
static void* startWritingThread(void *this_pointer);
/**
* Thread that sends data packets to client
*/
void startDataCallback();
/**
* Thread that listens to packets
* It pops the fifofree for free addresses, listens to packets and pushes them into the fifo
@ -652,6 +672,24 @@ private:
//***data call back thread parameters***
/** Number of data callback Threads */
int numberofDataCallbackThreads;
/** Data Callback Threads */
pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing DataCallback Threads */
sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each data callback thread */
volatile uint32_t dataCallbackThreadsMask;
/** Set to self-terminate data callback threads waiting for semaphores */
bool killAllDataCallbackThreads;
bool dataCallbackEnabled;
//***general and listening thread parameters***
/** Ensures if threads created successfully */
@ -669,9 +707,6 @@ private:
/** Semaphores Synchronizing Listening Threads */
sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Current Listening Thread Index*/
int currentListeningThreadIndex;
/** Mask with each bit indicating status of each listening thread */
volatile uint32_t listeningThreadsMask;

View File

@ -186,6 +186,12 @@ void UDPStandardImplementation::initializeMembers(){
frametoGuiCounter[i] = 0;
}
//***data callback thread parameters***
numberofDataCallbackThreads = 1;
dataCallbackThreadsMask = 0x0;
killAllDataCallbackThreads = false;
dataCallbackEnabled = true; /**false*/
//***general and listening thread parameters***
threadStarted = false;
currentThreadIndex = -1;
@ -767,14 +773,17 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
//delete threads and set number of listening threads
if(myDetectorType == EIGER){
pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask = 0x0;
listeningThreadsMask = 0x0;
writerThreadsMask = 0x0;
pthread_mutex_unlock(&(statusMutex));
if(threadStarted){
createListeningThreads(true);
createDataCallbackThreads(true);
createWriterThreads(true);
}
numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS;
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS;
}
@ -793,7 +802,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
//updates File Header
if(myDetectorType == EIGER){
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
updateFileHeader(i);
createDataCallbackThreads();
}
FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d);
@ -920,6 +930,10 @@ int UDPStandardImplementation::startReceiver(char *c){
listeningThreadsMask|=(1<<i);
for(int i=0;i<numberofWriterThreads;i++)
writerThreadsMask|=(1<<i);
if(dataCallbackEnabled){
for(int i=0;i<numberofDataCallbackThreads;i++)
dataCallbackThreadsMask|=(1<<i);
}
pthread_mutex_unlock(&(statusMutex));
@ -928,6 +942,10 @@ int UDPStandardImplementation::startReceiver(char *c){
sem_post(&listenSemaphore[i]);
for(int i=0; i < numberofWriterThreads; i++)
sem_post(&writerSemaphore[i]);
if(dataCallbackEnabled){
for(int i=0;i<numberofDataCallbackThreads;i++)
sem_post(&dataCallbackSemaphore[i]);
}
FILE_LOG(logINFO) << "Receiver Started";
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
@ -1185,6 +1203,55 @@ void UDPStandardImplementation::closeFile(int ithread){
* Listening and Writing Threads *****************************************
*************************************************************************/
int UDPStandardImplementation::createDataCallbackThreads(bool destroy){
FILE_LOG(logDEBUG) << __AT__ << " starting";
//reset masks
killAllDataCallbackThreads = false;
pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask = 0x0;
pthread_mutex_unlock(&(statusMutex));
//destroy
if(destroy){
FILE_LOG(logDEBUG) << "Info: Destroying Data Callback Thread(s)";
killAllDataCallbackThreads = true;
for(int i = 0; i < numberofDataCallbackThreads; ++i){
sem_post(&dataCallbackSemaphore[i]);
pthread_join(dataCallbackThreads[i],NULL);
FILE_LOG(logDEBUG) << "." << flush;
}
killAllDataCallbackThreads = false;
threadStarted = false;
FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) destroyed";
}
//create
else{
FILE_LOG(logDEBUG) << "Info: Creating Data Callback Thread(s)";
//reset current index
currentThreadIndex = -1;
for(int i = 0; i < numberofDataCallbackThreads; ++i){
sem_init(&dataCallbackSemaphore[i],1,0);
threadStarted = false;
currentThreadIndex = i;
if(pthread_create(&dataCallbackThreads[i], NULL,startDataCallbackThread, (void*) this)){
FILE_LOG(logERROR) << "Could not create listening thread with index " << i;
return FAIL;
}
while(!threadStarted);
FILE_LOG(logDEBUG) << "." << flush;
}
FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) created successfully.";
}
return OK;
}
int UDPStandardImplementation::createListeningThreads(bool destroy){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -1292,10 +1359,11 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){
void UDPStandardImplementation::setThreadPriorities(){
FILE_LOG(logDEBUG) << __AT__ << " called";
struct sched_param tcp_param, listen_param, write_param;
struct sched_param tcp_param, listen_param, write_param, datacallback_param;
bool rights = true;
//assign priorities
datacallback_param.sched_priority = 55;
tcp_param.sched_priority = 50;
listen_param.sched_priority = 99;
write_param.sched_priority = 90;
@ -1314,13 +1382,20 @@ void UDPStandardImplementation::setThreadPriorities(){
break;
}
}
for(int i = 0; i < numberofDataCallbackThreads; ++i){
if(rights)
if (pthread_setschedparam(dataCallbackThreads[i], SCHED_RR, &datacallback_param) == EPERM){
rights = false;
break;
}
}
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM)
rights = false;
if(!rights){
FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
}else{
FILE_LOG(logINFO) << "Priorities set - TCP:50, Listening:99, Writing:90";
FILE_LOG(logINFO) << "Priorities set - DataCallback: 55, TCP:50, Listening:99, Writing:90";
}
}
@ -1543,6 +1618,14 @@ int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){
void* UDPStandardImplementation::startDataCallbackThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPStandardImplementation*)this_pointer)->startDataCallback();
return this_pointer;
}
void* UDPStandardImplementation::startListeningThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPStandardImplementation*)this_pointer)->startListening();
@ -1560,6 +1643,62 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
void UDPStandardImplementation::startDataCallback(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
int ithread = currentThreadIndex;
//let calling function know thread started and obtained current
threadStarted = 1;
char* buffer;
// server address to bind
const char *hostName = "tcp://127.0.0.1:70001";/**increment this by ithread and detid*/
/* outer loop - loops once for each acquisition */
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
while(true){
void *context = zmq_ctx_new();
// create a publisher
socket = zmq_socket(context, ZMQ_PUB);
// bind
zmq_bind(socket,hostName);/**increment this by 1*/
/* inner loop - loop for each buffer */
//until mask reset (udp sockets shut down by client)
while((1 << ithread) & dataCallbackThreadsMask){
//wait for data
sem_wait(&dataCallbackSemaphore[ithread]);
if(status == TRANSMITTING)
continue;
int numpackets = (uint32_t)(*( (uint32_t*) latestData)); /*latestdata should be size of one buffer*/
memcpy(buffer, latestData, numpackets*onePacketSize);/**read first bytes to get numpackets*/
/*check if it should be added to previous (processlistening buffer for datacompression)*/
zmq_send(socket, buffer.data(), buffer.size(), 0);
}/*--end of loop for each buffer (inner loop)*/
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&dataCallbackSemaphore[ithread]);
//check to exit thread (for change of parameters) - only EXIT possibility
if(killAllDataCallbackThreads){
cprintf(BLUE,"DataCallback_Thread %d:Goodbye!\n",ithread);
//free resources at exit
zmq_unbind(socket, hostName);
zmq_close(socket);
zmq_ctx_destroy(context);
pthread_exit(NULL);
}
}/*--end of loop for each acquisition (outer loop) */
}
void UDPStandardImplementation::startListening(){
@ -2222,6 +2361,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//ensure listening threads done before updating status as it returns to client (from stopReceiver)
while(listeningThreadsMask)
usleep(5000);
while(dataCallbackThreadsMask)
usleep(5000);
//update status
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
@ -2485,22 +2626,19 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
FILE_LOG(logDEBUG) << __AT__ << " called";
//random read (gui not ready)
//need to toggle guiDataReady or the second frame wont be copied
if((!FrameToGuiFrequency) && (!guiData[ithread])){
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n");
#endif
pthread_mutex_lock(&dataReadyMutex);
guiDataReady[ithread]=0;
pthread_mutex_unlock(&dataReadyMutex);
}
//if nthe frame, wait for your turn (1st frame always shown as its zero)
else if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency));
if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency));
//random read (gui ready) or nth frame read: gui needs data now or it is the first frame
else{
//tell datacallback to pick up data
sem_post(&dataCallbackSemaphore[ithread]);
memcpy(latestData[ithread],buffer , numpackets*onePacketSize);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n");
#endif