mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-06 18:10:40 +02:00
merged zmqdata to developer
This commit is contained in:
commit
12574f4c05
@ -39,6 +39,12 @@ add_definitions(
|
|||||||
-DDACS_INT
|
-DDACS_INT
|
||||||
)
|
)
|
||||||
|
|
||||||
|
add_library(zmq STATIC IMPORTED )
|
||||||
|
|
||||||
|
set_target_properties(zmq PROPERTIES
|
||||||
|
IMPORTED_LOCATION ${CMAKE_CURRENT_SOURCE_DIR}/../slsReceiverSoftware/include/libzmq.a
|
||||||
|
)
|
||||||
|
|
||||||
add_library(slsDetectorStatic STATIC
|
add_library(slsDetectorStatic STATIC
|
||||||
${SOURCES}
|
${SOURCES}
|
||||||
${HEADERS}
|
${HEADERS}
|
||||||
|
@ -18,6 +18,9 @@ INCLUDES?= -IcommonFiles -IslsDetector -I../slsReceiverSoftware/MySocketTCP -Ius
|
|||||||
SRC_CLNT=slsDetectorAnalysis/fileIO.cpp usersFunctions/usersFunctions.cpp slsDetector/slsDetectorUtils.cpp slsDetector/slsDetectorCommand.cpp slsDetectorAnalysis/angularConversion.cpp slsDetectorAnalysis/angularConversionStatic.cpp slsDetectorAnalysis/energyConversion.cpp slsDetector/slsDetectorActions.cpp slsDetectorAnalysis/postProcessing.cpp slsDetector/slsDetector.cpp multiSlsDetector/multiSlsDetector.cpp slsDetectorAnalysis/postProcessingFuncs.cpp slsReceiverInterface/receiverInterface.cpp slsDetector/slsDetectorUsers.cpp threadFiles/CondVar.cpp threadFiles/Mutex.cpp threadFiles/ThreadPool.cpp #../slsReceiverSoftware/MySocketTCP/MySocketTCP.cpp
|
SRC_CLNT=slsDetectorAnalysis/fileIO.cpp usersFunctions/usersFunctions.cpp slsDetector/slsDetectorUtils.cpp slsDetector/slsDetectorCommand.cpp slsDetectorAnalysis/angularConversion.cpp slsDetectorAnalysis/angularConversionStatic.cpp slsDetectorAnalysis/energyConversion.cpp slsDetector/slsDetectorActions.cpp slsDetectorAnalysis/postProcessing.cpp slsDetector/slsDetector.cpp multiSlsDetector/multiSlsDetector.cpp slsDetectorAnalysis/postProcessingFuncs.cpp slsReceiverInterface/receiverInterface.cpp slsDetector/slsDetectorUsers.cpp threadFiles/CondVar.cpp threadFiles/Mutex.cpp threadFiles/ThreadPool.cpp #../slsReceiverSoftware/MySocketTCP/MySocketTCP.cpp
|
||||||
|
|
||||||
|
|
||||||
|
LIBZMQDIR = ../slsReceiverSoftware/include
|
||||||
|
LIBZMQ = -L$(LIBZMQDIR) -Wl,-rpath=$(LIBZMQDIR) -lzmq
|
||||||
|
|
||||||
$(info )
|
$(info )
|
||||||
$(info #######################################)
|
$(info #######################################)
|
||||||
$(info # Compiling slsDetectorSoftware #)
|
$(info # Compiling slsDetectorSoftware #)
|
||||||
@ -66,14 +69,14 @@ gotthardVirtualServer: $(SRC_MYTHEN_SVC)
|
|||||||
|
|
||||||
|
|
||||||
%.o : %.cpp %.h Makefile
|
%.o : %.cpp %.h Makefile
|
||||||
$(CXX) -o $@ -c $< $(INCLUDES) $(DFLAGS) -fPIC $(EPICSFLAGS) -lpthread #$(FLAGS)
|
$(CXX) -o $@ -c $< $(INCLUDES) $(DFLAGS) -fPIC $(EPICSFLAGS) -lpthread -lrt $(LIBZMQ) #$(FLAGS)
|
||||||
|
|
||||||
|
|
||||||
package: $(OBJS) $(DESTDIR)/libSlsDetector.so $(DESTDIR)/libSlsDetector.a
|
package: $(OBJS) $(DESTDIR)/libSlsDetector.so $(DESTDIR)/libSlsDetector.a
|
||||||
|
|
||||||
|
|
||||||
$(DESTDIR)/libSlsDetector.so: $(OBJS)
|
$(DESTDIR)/libSlsDetector.so: $(OBJS)
|
||||||
$(CXX) -shared -Wl,-soname,libSlsDetector.so -o libSlsDetector.so $(OBJS) -lc $(INCLUDES) $(DFLAGS) $(FLAGS) $(EPICSFLAGS) -L/usr/lib64 -lpthread
|
$(CXX) -shared -Wl,-soname,libSlsDetector.so -o libSlsDetector.so $(OBJS) -lc $(INCLUDES) $(DFLAGS) $(FLAGS) $(EPICSFLAGS) -L/usr/lib64 -lpthread -lrt $(LIBZMQ)
|
||||||
$(shell test -d $(DESTDIR) || mkdir -p $(DESTDIR))
|
$(shell test -d $(DESTDIR) || mkdir -p $(DESTDIR))
|
||||||
mv libSlsDetector.so $(DESTDIR)
|
mv libSlsDetector.so $(DESTDIR)
|
||||||
|
|
||||||
|
@ -15,7 +15,8 @@ using namespace std;
|
|||||||
|
|
||||||
|
|
||||||
#include "sls_detector_defs.h"
|
#include "sls_detector_defs.h"
|
||||||
|
#include <cstring>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
/** Error flags */
|
/** Error flags */
|
||||||
@ -68,6 +69,7 @@ using namespace std;
|
|||||||
#define RATE_CORRECTION_NO_TAU_PROVIDED 0x0000000001000000ULL
|
#define RATE_CORRECTION_NO_TAU_PROVIDED 0x0000000001000000ULL
|
||||||
#define PROGRAMMING_ERROR 0x0000000002000000ULL
|
#define PROGRAMMING_ERROR 0x0000000002000000ULL
|
||||||
#define RECEIVER_ACTIVATE 0x0000000004000000ULL
|
#define RECEIVER_ACTIVATE 0x0000000004000000ULL
|
||||||
|
#define DATA_STREAMING 0x0000000008000000ULL
|
||||||
|
|
||||||
// 0x00000000FFFFFFFFULL
|
// 0x00000000FFFFFFFFULL
|
||||||
/** @short class returning all error messages for error mask */
|
/** @short class returning all error messages for error mask */
|
||||||
@ -213,6 +215,10 @@ public:
|
|||||||
if(slsErrorMask&RECEIVER_ACTIVATE)
|
if(slsErrorMask&RECEIVER_ACTIVATE)
|
||||||
retval.append("Could not activate/deactivate receiver\n");
|
retval.append("Could not activate/deactivate receiver\n");
|
||||||
|
|
||||||
|
if(slsErrorMask&DATA_STREAMING)
|
||||||
|
retval.append("Could not set/reset Data Streaming\n");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//------------------------------------------------------ length of message
|
//------------------------------------------------------ length of message
|
||||||
|
|
||||||
@ -237,21 +243,22 @@ public:
|
|||||||
int64_t clearErrorMask(){errorMask=0;return errorMask;};
|
int64_t clearErrorMask(){errorMask=0;return errorMask;};
|
||||||
|
|
||||||
/** Gets the not added detector list
|
/** Gets the not added detector list
|
||||||
/returns list
|
/returns list
|
||||||
*/
|
*/
|
||||||
char* getNotAddedList(){return notAddedList;};
|
char* getNotAddedList(){return notAddedList;};
|
||||||
|
|
||||||
/** Append the detector to not added detector list
|
/** Append the detector to not added detector list
|
||||||
* @param name append to the list
|
* @param name append to the list
|
||||||
/returns list
|
/returns list
|
||||||
*/
|
*/
|
||||||
void appendNotAddedList(const char* name){strcat(notAddedList,name);strcat(notAddedList,"+");};
|
void appendNotAddedList(const char* name){strcat(notAddedList,name);strcat(notAddedList,"+");};
|
||||||
|
|
||||||
/** Clears not added detector list
|
/** Clears not added detector list
|
||||||
/returns error mask
|
/returns error mask
|
||||||
*/
|
*/
|
||||||
void clearNotAddedList(){strcpy(notAddedList,"");};
|
void clearNotAddedList(){strcpy(notAddedList,"");};
|
||||||
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
/** Error Mask */
|
/** Error Mask */
|
||||||
@ -259,6 +266,7 @@ protected:
|
|||||||
|
|
||||||
/** Detectors Not added List */
|
/** Detectors Not added List */
|
||||||
char notAddedList[MAX_STR_LENGTH];
|
char notAddedList[MAX_STR_LENGTH];
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* ERROR_DEFS_H_ */
|
#endif /* ERROR_DEFS_H_ */
|
||||||
|
@ -23,6 +23,8 @@ ID: $Id$
|
|||||||
#include <sys/shm.h>
|
#include <sys/shm.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <zmq.h>
|
||||||
|
#include <rapidjson/document.h> //to scan json header in zmq stream
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
|
||||||
@ -299,7 +301,7 @@ int multiSlsDetector::createThreadPool(){
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
cout << "Initialized Threadpool" << endl;
|
cout << "Initialized Threadpool " << threadpool << endl;
|
||||||
#endif
|
#endif
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -311,7 +313,7 @@ void multiSlsDetector::destroyThreadPool(){
|
|||||||
threadpool->destroy_threadpool();
|
threadpool->destroy_threadpool();
|
||||||
threadpool=0;
|
threadpool=0;
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
cout<<"Destroyed Threadpool"<<endl;
|
cout<<"Destroyed Threadpool "<< threadpool << endl;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -392,7 +394,6 @@ int multiSlsDetector::addSlsDetector(int id, int pos) {
|
|||||||
|
|
||||||
//set offsets
|
//set offsets
|
||||||
updateOffsets();
|
updateOffsets();
|
||||||
destroyThreadPool();
|
|
||||||
if(createThreadPool() == FAIL)
|
if(createThreadPool() == FAIL)
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
|
||||||
@ -654,7 +655,6 @@ int multiSlsDetector::setDetectorId(int ival, int pos){
|
|||||||
|
|
||||||
int multiSlsDetector::addSlsDetector(const char *name, int pos) {
|
int multiSlsDetector::addSlsDetector(const char *name, int pos) {
|
||||||
|
|
||||||
|
|
||||||
detectorType t=getDetectorType(string(name));
|
detectorType t=getDetectorType(string(name));
|
||||||
int online=0;
|
int online=0;
|
||||||
slsDetector *s=NULL;
|
slsDetector *s=NULL;
|
||||||
@ -866,7 +866,6 @@ int multiSlsDetector::removeSlsDetector(int pos) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
updateOffsets();
|
updateOffsets();
|
||||||
destroyThreadPool();
|
|
||||||
if(createThreadPool() == FAIL)
|
if(createThreadPool() == FAIL)
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
|
||||||
@ -1198,6 +1197,7 @@ slsDetectorDefs::detectorSettings multiSlsDetector::getSettings(int pos) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=posmin; idet<posmax; idet++){
|
for(int idet=posmin; idet<posmax; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -1245,6 +1245,7 @@ slsDetectorDefs::detectorSettings multiSlsDetector::setSettings(detectorSettings
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=posmin; idet<posmax; idet++){
|
for(int idet=posmin; idet<posmax; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -1628,6 +1629,7 @@ int multiSlsDetector::startAndReadAllNoWait(){
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=posmin; idet<posmax; idet++){
|
for(int idet=posmin; idet<posmax; idet++){
|
||||||
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
||||||
@ -3320,6 +3322,7 @@ char* multiSlsDetector::setNetworkParameter(networkParameter p, string s){
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -3351,6 +3354,7 @@ char* multiSlsDetector::setNetworkParameter(networkParameter p, string s){
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return getNetworkParameter(p);
|
return getNetworkParameter(p);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -3928,6 +3932,7 @@ int multiSlsDetector::executeTrimming(trimMode mode, int par1, int par2, int imo
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -3994,6 +3999,7 @@ int multiSlsDetector::loadSettingsFile(string fname, int imod) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -4068,6 +4074,7 @@ int multiSlsDetector::setAllTrimbits(int val, int imod){
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -4117,6 +4124,7 @@ int multiSlsDetector::loadCalibrationFile(string fname, int imod) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -4774,6 +4782,7 @@ int multiSlsDetector::startReceiver(){
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=posmin; idet<posmax; idet++){
|
for(int idet=posmin; idet<posmax; idet++){
|
||||||
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
||||||
@ -4835,6 +4844,7 @@ int multiSlsDetector::stopReceiver(){
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=posmin; idet<posmax; idet++){
|
for(int idet=posmin; idet<posmax; idet++){
|
||||||
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
|
||||||
@ -4919,7 +4929,6 @@ slsDetectorDefs::runStatus multiSlsDetector::getReceiverStatus(){
|
|||||||
int multiSlsDetector::getFramesCaughtByReceiver() {
|
int multiSlsDetector::getFramesCaughtByReceiver() {
|
||||||
int ret=0,ret1=0;
|
int ret=0,ret1=0;
|
||||||
|
|
||||||
|
|
||||||
if(thisMultiDetector->numberOfDetectors>10) {
|
if(thisMultiDetector->numberOfDetectors>10) {
|
||||||
if (detectors[0]){
|
if (detectors[0]){
|
||||||
ret =detectors[0]->getFramesCaughtByReceiver();
|
ret =detectors[0]->getFramesCaughtByReceiver();
|
||||||
@ -4981,99 +4990,353 @@ int multiSlsDetector::resetFramesCaught() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int multiSlsDetector::createReceivingDataThreads(bool destroy){
|
||||||
|
if(!destroy) cprintf(MAGENTA,"Going to create data threads\n");
|
||||||
|
else cprintf(MAGENTA,"Going to destroy data threads\n");
|
||||||
|
|
||||||
int* multiSlsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){
|
|
||||||
int nel=(thisMultiDetector->dataBytes)/sizeof(int);
|
int numReadouts = thisMultiDetector->numberOfDetectors;
|
||||||
if(nel <= 0){
|
if(getDetectorsType() == EIGER)
|
||||||
cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl;
|
numReadouts *= 2;
|
||||||
acquisitionIndex = -1;
|
|
||||||
return NULL;
|
|
||||||
|
//reset masks
|
||||||
|
killAllReceivingDataThreads = false;
|
||||||
|
|
||||||
|
//destroy
|
||||||
|
if(destroy){
|
||||||
|
#ifdef DEBUG
|
||||||
|
cout << "Destroying Receiving Data Thread(s)" << endl;
|
||||||
|
#endif
|
||||||
|
killAllReceivingDataThreads = true;
|
||||||
|
for(int i = 0; i < numReadouts; ++i){
|
||||||
|
sem_post(&sem_singlewait[i]);
|
||||||
|
pthread_join(receivingDataThreads[i],NULL);
|
||||||
|
sem_destroy(&sem_singlewait[i]);
|
||||||
|
sem_destroy(&sem_singledone[i]);
|
||||||
|
#ifdef DEBUG
|
||||||
|
cout << "." << flush << endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
killAllReceivingDataThreads = false;
|
||||||
|
threadStarted = false;
|
||||||
|
|
||||||
|
cout << "Destroyed Receiving Data Thread(s)" << endl;
|
||||||
}
|
}
|
||||||
int n,complete=OK;
|
|
||||||
int i,k,offsetX, offsetY, maxX, maxY; double dr;
|
|
||||||
int* retval=new int[nel];
|
|
||||||
int *retdet = NULL, *p=retval;
|
|
||||||
string fullFName="";
|
|
||||||
string ext="";
|
|
||||||
int index=-1,f_index=-1,p_index=-1,det_index=-1;
|
|
||||||
double sv0=-1,sv1=-1;
|
|
||||||
|
|
||||||
|
//create
|
||||||
|
else{
|
||||||
|
#ifdef DEBUG
|
||||||
|
cout << "Creating Receiving Data Thread(s)" << endl;
|
||||||
|
#endif
|
||||||
|
//reset current index
|
||||||
|
currentThreadIndex = -1;
|
||||||
|
for(int i = 0; i < numReadouts; ++i){
|
||||||
|
sem_init(&sem_singlewait[i],1,0);
|
||||||
|
sem_init(&sem_singledone[i],1,0);
|
||||||
|
threadStarted = false;
|
||||||
|
currentThreadIndex = i;
|
||||||
|
if(pthread_create(&receivingDataThreads[i], NULL,staticstartReceivingDataThread, (void*) this)){
|
||||||
|
cprintf(RED, "Could not create receiving data thread with index %d\n",i);
|
||||||
|
return FAIL;
|
||||||
|
}
|
||||||
|
while(!threadStarted);
|
||||||
|
#ifdef DEBUG
|
||||||
|
cout << "." << flush << endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
cout << "Receiving Data Thread(s) created" << endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void* multiSlsDetector::staticstartReceivingDataThread(void* this_pointer){
|
||||||
|
((multiSlsDetector*)this_pointer)->startReceivingDataThread();
|
||||||
|
//while(true);
|
||||||
|
|
||||||
|
return this_pointer;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void multiSlsDetector::startReceivingDataThread(){
|
||||||
|
|
||||||
|
int ithread = currentThreadIndex; //set current thread value index
|
||||||
|
|
||||||
|
//server details
|
||||||
|
char hostname[100] = "tcp://127.0.0.1:";
|
||||||
|
int portno = DEFAULT_ZMQ_PORTNO + ithread;
|
||||||
|
sprintf(hostname,"%s%d",hostname,portno);
|
||||||
|
|
||||||
|
//socket details
|
||||||
|
zmq_msg_t message;
|
||||||
|
void *context;
|
||||||
|
void *zmqsocket;
|
||||||
|
context = zmq_ctx_new();
|
||||||
|
zmqsocket = zmq_socket(context, ZMQ_PULL);
|
||||||
|
//int hwmval = 10;
|
||||||
|
//zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
|
||||||
|
zmq_connect(zmqsocket, hostname);
|
||||||
|
cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
|
||||||
|
cprintf(BLUE,"%d Created socket\n",ithread);
|
||||||
|
|
||||||
|
|
||||||
|
//initializations
|
||||||
|
int numReadoutPerDetector = 1;
|
||||||
|
bool jungfrau = false;
|
||||||
if(getDetectorsType() == EIGER){
|
if(getDetectorsType() == EIGER){
|
||||||
|
numReadoutPerDetector = 2;
|
||||||
|
}else if(getDetectorsType() == JUNGFRAU)
|
||||||
|
jungfrau = true;
|
||||||
|
int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes();
|
||||||
|
int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
|
||||||
|
int* image = new int[nel];
|
||||||
|
int len,idet = 0;
|
||||||
|
singleframe[ithread]=NULL;
|
||||||
|
threadStarted = true; //let calling function know thread started and obtained current
|
||||||
|
|
||||||
|
|
||||||
|
//infinite loop, exited only (if gui restarted/ enabledatastreaming called)
|
||||||
|
while(true){
|
||||||
|
|
||||||
|
|
||||||
|
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
|
||||||
|
//check to exit thread
|
||||||
|
if(killAllReceivingDataThreads){
|
||||||
|
delete [] singleframe[ithread];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//scan header-------------------------------------------------------------------
|
||||||
|
zmq_msg_init (&message);
|
||||||
|
len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||||
|
if (len == -1) {
|
||||||
|
cprintf(BG_RED,"Could not read header for socket %d\n",ithread);
|
||||||
|
zmq_msg_close(&message);
|
||||||
|
cprintf(RED, "%d message null\n",ithread);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// error if you print it
|
||||||
|
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
|
||||||
|
//cprintf(BLUE,"%d header %d\n",ithread,len);
|
||||||
|
rapidjson::Document d;
|
||||||
|
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
|
||||||
|
#ifdef VERYVERBOSE
|
||||||
|
// htype is an array of strings
|
||||||
|
rapidjson::Value::Array htype = d["htype"].GetArray();
|
||||||
|
for(int i=0; i< htype.Size(); i++)
|
||||||
|
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
|
||||||
|
// shape is an array of ints
|
||||||
|
rapidjson::Value::Array shape = d["shape"].GetArray();
|
||||||
|
cout << ithread << "shape: ";
|
||||||
|
for(int i=0; i< shape.Size(); i++)
|
||||||
|
cout << ithread << shape[i].GetInt() << " ";
|
||||||
|
cout << endl;
|
||||||
|
|
||||||
|
cout << ithread << "type: " << d["type"].GetString() << endl;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
if(!ithread){
|
||||||
|
currentAcquisitionIndex = d["acqIndex"].GetInt();
|
||||||
|
currentFrameIndex = d["fIndex"].GetInt();
|
||||||
|
currentSubFrameIndex = d["subfnum"].GetInt();
|
||||||
|
strcpy(currentFileName ,d["fname"].GetString());
|
||||||
|
#ifdef VERYVERBOSE
|
||||||
|
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
|
||||||
|
cout << "Frame index: " << currentFrameIndex << endl;
|
||||||
|
cout << "Subframe index: " << currentSubFrameIndex << endl;
|
||||||
|
cout << "File name: " << currentFileName << endl;
|
||||||
|
#endif
|
||||||
|
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
|
||||||
|
}
|
||||||
|
singleframe[ithread]=image;
|
||||||
|
|
||||||
|
// close the message
|
||||||
|
zmq_msg_close(&message);
|
||||||
|
|
||||||
|
|
||||||
|
//scan data-------------------------------------------------------------------
|
||||||
|
zmq_msg_init (&message);
|
||||||
|
len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||||
|
|
||||||
|
//cprintf(BLUE,"%d data %d\n",ithread,len);
|
||||||
|
//end of socket ("end")
|
||||||
|
if (len < 1024*256 ) {
|
||||||
|
if(len == 3){
|
||||||
|
//cprintf(RED,"Received end of acquisition for socket %d\n", ithread);
|
||||||
|
singleframe[ithread] = NULL;
|
||||||
|
//break;
|
||||||
|
}else{
|
||||||
|
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread);
|
||||||
|
memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
//actual data
|
||||||
|
//cprintf(BLUE,"%d actual dataaa\n",ithread);
|
||||||
|
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
|
||||||
|
|
||||||
|
|
||||||
|
//jungfrau masking adcval
|
||||||
|
if(jungfrau){
|
||||||
|
for(unsigned int i=0;i<nel;i++){
|
||||||
|
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sem_post(&sem_singledone[ithread]);//let multi know is ready
|
||||||
|
zmq_msg_close(&message); // close the message
|
||||||
|
}
|
||||||
|
|
||||||
|
cprintf(RED,"%d Closing socket\n",ithread);
|
||||||
|
//close socket
|
||||||
|
zmq_disconnect(zmqsocket, hostname);
|
||||||
|
zmq_close(zmqsocket);
|
||||||
|
zmq_ctx_destroy(context);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void multiSlsDetector::readFrameFromReceiver(){
|
||||||
|
|
||||||
|
//determine number of half readouts and maxX and maxY
|
||||||
|
int maxX=0,maxY=0;
|
||||||
|
int numReadoutPerDetector = 1;
|
||||||
|
if(getDetectorsType() == EIGER){
|
||||||
|
numReadoutPerDetector = 2;
|
||||||
maxX = thisMultiDetector->numberOfChannel[X];
|
maxX = thisMultiDetector->numberOfChannel[X];
|
||||||
maxY = thisMultiDetector->numberOfChannel[Y];
|
maxY = thisMultiDetector->numberOfChannel[Y];
|
||||||
}
|
}
|
||||||
|
int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors;
|
||||||
|
|
||||||
|
//initializing variables
|
||||||
|
strcpy(currentFileName,"");
|
||||||
|
currentAcquisitionIndex = -1;
|
||||||
|
currentFrameIndex = -1;
|
||||||
|
currentSubFrameIndex = -1;
|
||||||
|
|
||||||
|
|
||||||
for (int id=0; id<thisMultiDetector->numberOfDetectors; id++) {
|
//getting values
|
||||||
if (detectors[id]) {
|
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
|
||||||
n=detectors[id]->getDataBytes();
|
if(detectors[0]){
|
||||||
retdet=detectors[id]->readFrameFromReceiver(fName, acquisitionIndex, frameIndex, subFrameIndex);
|
slsdatabytes = detectors[0]->getDataBytes();
|
||||||
if(detectors[id]->getErrorMask())
|
slsmaxchannels = detectors[0]->getMaxNumberOfChannels();
|
||||||
setErrorMask(getErrorMask()|(1<<id));
|
bytesperchannel = slsdatabytes/slsmaxchannels;
|
||||||
if (retdet){
|
slsmaxX = detectors[0]->getTotalNumberOfChannels(X);
|
||||||
if (acquisitionIndex==-1){
|
slsmaxY = detectors[0]->getTotalNumberOfChannels(Y);
|
||||||
complete = FAIL;
|
}
|
||||||
delete [] retdet;
|
int nel=(thisMultiDetector->dataBytes)/sizeof(int);
|
||||||
}else{
|
if(nel <= 0){
|
||||||
n=detectors[id]->getDataBytes();
|
cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes);
|
||||||
if(getDetectorsType() == EIGER){
|
return;
|
||||||
//cout << "fname:"<<fName<<" findex:"<<fIndex<<endl;//cout<<"n:"<<n<<endl;//cout<<"maxchan:"<<detectors[id]->getMaxNumberOfChannels()<<" n:"<<n<<endl;
|
}
|
||||||
dr = (double)n/detectors[id]->getMaxNumberOfChannels();
|
int* multiframe=new int[nel];
|
||||||
k=(int)(detectors[id]->getMaxNumberOfChannels(X)*dr);//bit mode
|
int* p = multiframe;
|
||||||
//cout << "dr:"<<dr<<endl;//cout << "k:"<<k<<endl;
|
int idet,offsetY,offsetX;
|
||||||
offsetY = (int)(((maxY - (thisMultiDetector->offsetY[id] + detectors[id]->getMaxNumberOfChannels(Y))) * maxX)*dr);//bit mode
|
int halfreadoutoffset = (slsmaxX/numReadoutPerDetector);
|
||||||
offsetX = (int)(thisMultiDetector->offsetX[id]*dr);
|
int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
|
||||||
//cout << "offsetY"<<offsetY<< " offsetX:"<<offsetX<<endl;
|
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
|
||||||
for(i=0; i< 256;i++){
|
|
||||||
memcpy((((char*)p) + offsetY + offsetX + ((int)(i*maxX*dr))) ,(((char*)retdet) + (i*k)),k);//bit mode
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
memcpy(p,retdet,n);
|
|
||||||
p+=n/sizeof(int);
|
|
||||||
}
|
|
||||||
|
|
||||||
delete [] retdet;
|
|
||||||
//concatenate filenames
|
|
||||||
if(!fullFName.length()){
|
|
||||||
//assign file prefix
|
|
||||||
fullFName.assign(fileIO::getFileName());
|
|
||||||
if (strrchr(fName,'.')!=NULL){
|
|
||||||
ext.assign(fName);
|
|
||||||
size_t dot = ext.rfind(".");
|
|
||||||
if(dot != string::npos)
|
|
||||||
ext = ext.erase(0,dot);
|
|
||||||
else
|
|
||||||
ext = "";
|
|
||||||
|
|
||||||
//get variables
|
volatile uint64_t dataThreadMask = 0x0;
|
||||||
fileIOStatic::getVariablesFromFileName(fName,index, f_index, p_index, sv0, sv1, det_index);
|
for(int i = 0; i < numReadouts; ++i)
|
||||||
//append scan and det variables
|
dataThreadMask|=(1<<i);
|
||||||
fullFName.append(fileIOStatic::getReceiverFileNameToConcatenate(fName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//append only if scan variables are different
|
//construct complete image and send to callback
|
||||||
if(!fileIOStatic::verifySameFrame(fName,index,f_index, p_index, sv0, sv1, det_index)){
|
while(true){
|
||||||
fullFName.append(fileIOStatic::getReceiverFileNameToConcatenate(fName));
|
memset(((char*)multiframe),0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
|
||||||
}
|
|
||||||
}
|
//post all of them to start
|
||||||
}else {
|
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
|
||||||
#ifdef VERBOSE
|
if((1 << ireadout) & dataThreadMask){
|
||||||
cout << "Receiver for detector " << id << " does not have data left " << endl;
|
sem_post(&sem_singlewait[ireadout]); //sls to continue
|
||||||
#endif
|
|
||||||
delete [] retval;
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//get each frame
|
||||||
|
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
|
||||||
|
//cprintf(BLUE,"multi checking %d mask:0x%x\n",ireadout,receivingDataThreadMask);
|
||||||
|
idet = ireadout/numReadoutPerDetector;
|
||||||
|
if((1 << ireadout) & dataThreadMask){ //if running
|
||||||
|
|
||||||
|
sem_wait(&sem_singledone[ireadout]); //wait for sls to copy
|
||||||
|
|
||||||
|
//this socket closed
|
||||||
|
if(singleframe[ireadout] == NULL){ //if got nothing
|
||||||
|
dataThreadMask^=(1<<ireadout);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
//assemble data
|
||||||
|
if(maxX){ //eiger, so interleaving between ports in one readout itself
|
||||||
|
offsetY = (maxY - (thisMultiDetector->offsetY[idet] + slsmaxY)) * maxX * bytesperchannel;
|
||||||
|
//the left half or right half
|
||||||
|
if(!(ireadout%numReadoutPerDetector))
|
||||||
|
offsetX = thisMultiDetector->offsetX[idet];
|
||||||
|
else
|
||||||
|
offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset;
|
||||||
|
offsetX *= bytesperchannel;
|
||||||
|
//cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n",
|
||||||
|
// offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel);
|
||||||
|
// cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel);
|
||||||
|
//itnerleaving with other detectors
|
||||||
|
|
||||||
|
//bottom
|
||||||
|
if(((idet+1)%2) == 0){
|
||||||
|
for(int i=0;i<slsmaxY;++i)
|
||||||
|
memcpy(((char*)multiframe) + offsetY + offsetX + ((slsmaxY-i)*maxX*bytesperchannel),
|
||||||
|
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
|
||||||
|
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
|
||||||
|
}
|
||||||
|
//top
|
||||||
|
else{
|
||||||
|
for(int i=0;i<slsmaxY;++i)
|
||||||
|
memcpy(((char*)multiframe) + offsetY + offsetX + (i*maxX*bytesperchannel),
|
||||||
|
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
|
||||||
|
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//no interleaving, just add to the end
|
||||||
|
//numReadout always 1 here
|
||||||
|
else{
|
||||||
|
memcpy(p,multiframe,slsdatabytes);
|
||||||
|
p+=slsdatabytes/sizeof(int);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//all done
|
||||||
|
if(!dataThreadMask)
|
||||||
|
break;
|
||||||
|
|
||||||
|
|
||||||
|
//send data to callback
|
||||||
|
fdata = decodeData(multiframe);
|
||||||
|
if ((fdata) && (dataReady)){
|
||||||
|
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName,nx,ny);
|
||||||
|
dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);//should be fnum and subfnum from json header
|
||||||
|
delete thisData;
|
||||||
|
fdata = NULL;
|
||||||
|
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
|
||||||
|
}
|
||||||
|
setCurrentProgress(currentAcquisitionIndex+1);
|
||||||
}
|
}
|
||||||
//append extension
|
|
||||||
fullFName.append(ext);
|
//free resources
|
||||||
strcpy(fName,fullFName.c_str());
|
delete[] multiframe;
|
||||||
//if some of the receivers did not give data, dont count it
|
}
|
||||||
if((getDetectorsType() == EIGER) &&(complete ==FAIL))
|
|
||||||
acquisitionIndex = -1;
|
|
||||||
return retval;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -5276,7 +5539,7 @@ int multiSlsDetector::calibratePedestal(int frames){
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver,int i){
|
int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver, int freq){
|
||||||
int ret=-100, ret1;
|
int ret=-100, ret1;
|
||||||
|
|
||||||
if(!getFromReceiver)
|
if(!getFromReceiver)
|
||||||
@ -5284,7 +5547,7 @@ int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver,int i){
|
|||||||
|
|
||||||
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
|
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
|
||||||
if (detectors[idet]) {
|
if (detectors[idet]) {
|
||||||
ret1=detectors[idet]->setReadReceiverFrequency(getFromReceiver,i);
|
ret1=detectors[idet]->setReadReceiverFrequency(getFromReceiver,freq);
|
||||||
if(detectors[idet]->getErrorMask())
|
if(detectors[idet]->getErrorMask())
|
||||||
setErrorMask(getErrorMask()|(1<<idet));
|
setErrorMask(getErrorMask()|(1<<idet));
|
||||||
if (ret==-100)
|
if (ret==-100)
|
||||||
@ -5300,6 +5563,47 @@ int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver,int i){
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// only called from gui or that wants zmq data packets
|
||||||
|
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
|
||||||
|
|
||||||
|
if(enable >= 0){
|
||||||
|
|
||||||
|
if(threadStarted != enable){
|
||||||
|
//destroy data threads
|
||||||
|
if(threadStarted)
|
||||||
|
createReceivingDataThreads(true);
|
||||||
|
|
||||||
|
//create data threads
|
||||||
|
if(enable > 0){
|
||||||
|
if(createReceivingDataThreads() == FAIL){
|
||||||
|
std::cout << "Could not create data threads in client. Aborting creating data threads in receiver" << std::endl;
|
||||||
|
//only for the first det as theres no general one
|
||||||
|
setErrorMask(getErrorMask()|(1<<0));
|
||||||
|
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}else enable = threadStarted;
|
||||||
|
int ret=-100, ret1;
|
||||||
|
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
|
||||||
|
if (detectors[idet]) {
|
||||||
|
ret1=detectors[idet]->enableDataStreamingFromReceiver(enable);
|
||||||
|
if(detectors[idet]->getErrorMask())
|
||||||
|
setErrorMask(getErrorMask()|(1<<idet));
|
||||||
|
if (ret==-100)
|
||||||
|
ret=ret1;
|
||||||
|
else if (ret!=ret1)
|
||||||
|
ret=-1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
if(enable == -1)
|
||||||
|
return threadStarted;
|
||||||
|
*/
|
||||||
|
return (threadStarted & ret);
|
||||||
|
}
|
||||||
|
|
||||||
int multiSlsDetector::enableReceiverCompression(int i){
|
int multiSlsDetector::enableReceiverCompression(int i){
|
||||||
int ret=-100,ret1;
|
int ret=-100,ret1;
|
||||||
@ -5506,6 +5810,7 @@ int multiSlsDetector::pulsePixel(int n,int x,int y) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -5542,6 +5847,7 @@ int multiSlsDetector::pulsePixelNMove(int n,int x,int y) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
@ -5578,6 +5884,7 @@ int multiSlsDetector::pulseChip(int n) {
|
|||||||
threadpool->add_task(task);
|
threadpool->add_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
threadpool->startExecuting();
|
||||||
threadpool->wait_for_tasks_to_complete();
|
threadpool->wait_for_tasks_to_complete();
|
||||||
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
|
||||||
if(detectors[idet]){
|
if(detectors[idet]){
|
||||||
|
@ -1181,18 +1181,21 @@ class multiSlsDetector : public slsDetectorUtils {
|
|||||||
/**
|
/**
|
||||||
* resets framescaught
|
* resets framescaught
|
||||||
* @param index frames caught by receiver
|
* @param index frames caught by receiver
|
||||||
*/
|
*/
|
||||||
int resetFramesCaught();
|
int resetFramesCaught();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads a frame from receiver
|
* Create Receiving Data Threads
|
||||||
* @param fName file name of current frame()
|
* @param destroy is true to destroy all the threads
|
||||||
* @param acquisitionIndex current acquisition index
|
* @return OK or FAIL
|
||||||
* @param frameIndex current frame index (for each scan)
|
*/
|
||||||
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
|
int createReceivingDataThreads(bool destroy = false);
|
||||||
/returns a frame read from recever
|
|
||||||
|
|
||||||
|
|
||||||
|
/** Reads frames from receiver through a constant socket
|
||||||
*/
|
*/
|
||||||
int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex);
|
void readFrameFromReceiver();
|
||||||
|
|
||||||
/** Locks/Unlocks the connection to the receiver
|
/** Locks/Unlocks the connection to the receiver
|
||||||
/param lock sets (1), usets (0), gets (-1) the lock
|
/param lock sets (1), usets (0), gets (-1) the lock
|
||||||
@ -1253,14 +1256,20 @@ class multiSlsDetector : public slsDetectorUtils {
|
|||||||
int calibratePedestal(int frames = 0);
|
int calibratePedestal(int frames = 0);
|
||||||
|
|
||||||
/** Sets the read receiver frequency
|
/** Sets the read receiver frequency
|
||||||
if Receiver read upon gui request, readRxrFrequency=0,
|
if data required from receiver randomly readRxrFrequency=0,
|
||||||
else every nth frame to be sent to gui
|
else every nth frame to be sent to gui
|
||||||
@param getFromReceiver is 1 if it should ask the receiver,
|
@param getFromReceiver is 1 if it should ask the receiver,
|
||||||
0 if it can get it from multislsdetecter
|
0 if it can get it from multi structure
|
||||||
@param i is the receiver read frequency
|
@param freq is the receiver read frequency
|
||||||
/returns read receiver frequency
|
/returns read receiver frequency
|
||||||
*/
|
*/
|
||||||
int setReadReceiverFrequency(int getFromReceiver, int i=-1);
|
int setReadReceiverFrequency(int getFromReceiver, int freq=-1);
|
||||||
|
|
||||||
|
/** Enable or disable streaming data from receiver to client
|
||||||
|
* @param enable 0 to disable 1 to enable -1 to only get the value
|
||||||
|
* @returns data streaming
|
||||||
|
*/
|
||||||
|
int enableDataStreamingFromReceiver(int enable=-1);
|
||||||
|
|
||||||
/** updates the multidetector offsets */
|
/** updates the multidetector offsets */
|
||||||
void updateOffsets();
|
void updateOffsets();
|
||||||
@ -1363,11 +1372,36 @@ class multiSlsDetector : public slsDetectorUtils {
|
|||||||
bool getAcquiringFlag();
|
bool getAcquiringFlag();
|
||||||
|
|
||||||
|
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* Static function - Starts Data Thread of this object
|
||||||
|
* @param this_pointer pointer to this object
|
||||||
|
*/
|
||||||
|
static void* staticstartReceivingDataThread(void *this_pointer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread that receives data packets from receiver
|
||||||
|
*/
|
||||||
|
void startReceivingDataThread();
|
||||||
|
|
||||||
|
/* synchronizing between zmq threads */
|
||||||
|
sem_t sem_singledone[MAXDET];
|
||||||
|
sem_t sem_singlewait[MAXDET];
|
||||||
|
int* singleframe[MAXDET];
|
||||||
|
|
||||||
|
/* Parameters given to the gui picked up from zmq threads*/
|
||||||
|
int currentAcquisitionIndex;
|
||||||
|
int currentFrameIndex;
|
||||||
|
int currentSubFrameIndex;
|
||||||
|
char currentFileName[MAX_STR_LENGTH];
|
||||||
|
|
||||||
|
pthread_t receivingDataThreads[MAXDET];
|
||||||
|
/** Ensures if threads created successfully */
|
||||||
|
bool threadStarted;
|
||||||
|
/** Current Thread Index*/
|
||||||
|
int currentThreadIndex;
|
||||||
|
/** Set to self-terminate data receiving threads waiting for semaphores */
|
||||||
|
bool killAllReceivingDataThreads;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
@ -1385,7 +1419,6 @@ class multiSlsDetector : public slsDetectorUtils {
|
|||||||
ThreadPool* threadpool;
|
ThreadPool* threadpool;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include "gitInfoLib.h"
|
#include "gitInfoLib.h"
|
||||||
|
|
||||||
|
|
||||||
int slsDetector::initSharedMemory(detectorType type, int id) {
|
int slsDetector::initSharedMemory(detectorType type, int id) {
|
||||||
|
|
||||||
|
|
||||||
@ -3987,9 +3986,9 @@ int64_t slsDetector::setTimer(timerIndex index, int64_t t){
|
|||||||
if (index!=MEASUREMENTS_NUMBER) {
|
if (index!=MEASUREMENTS_NUMBER) {
|
||||||
|
|
||||||
|
|
||||||
//#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
std::cout<< "Setting timer "<< index << " to " << t << "ns/value" << std::endl;
|
std::cout<< "Setting timer "<< index << " to " << t << "ns/value" << std::endl;
|
||||||
//#endif
|
#endif
|
||||||
if (thisDetector->onlineFlag==ONLINE_FLAG) {
|
if (thisDetector->onlineFlag==ONLINE_FLAG) {
|
||||||
if (connectControl() == OK){
|
if (connectControl() == OK){
|
||||||
controlSocket->SendDataOnly(&fnum,sizeof(fnum));
|
controlSocket->SendDataOnly(&fnum,sizeof(fnum));
|
||||||
@ -5556,6 +5555,7 @@ char* slsDetector::setReceiver(string receiverIP){
|
|||||||
std::cout << "frame number:" << thisDetector->timerValue[FRAME_NUMBER] << endl;
|
std::cout << "frame number:" << thisDetector->timerValue[FRAME_NUMBER] << endl;
|
||||||
std::cout << "dynamic range:" << thisDetector->dynamicRange << endl << endl;
|
std::cout << "dynamic range:" << thisDetector->dynamicRange << endl << endl;
|
||||||
std::cout << "10GbE:" << thisDetector->tenGigaEnable << endl << endl;
|
std::cout << "10GbE:" << thisDetector->tenGigaEnable << endl << endl;
|
||||||
|
//std::cout << "dataStreaming:" << enableDataStreamingFromReceiver(-1) << endl << endl;
|
||||||
/** enable compresison, */
|
/** enable compresison, */
|
||||||
#endif
|
#endif
|
||||||
if(setDetectorType()!= GENERIC){
|
if(setDetectorType()!= GENERIC){
|
||||||
@ -5580,6 +5580,8 @@ char* slsDetector::setReceiver(string receiverIP){
|
|||||||
setTimer(FRAME_NUMBER,thisDetector->timerValue[FRAME_NUMBER]);
|
setTimer(FRAME_NUMBER,thisDetector->timerValue[FRAME_NUMBER]);
|
||||||
setDynamicRange(thisDetector->dynamicRange);
|
setDynamicRange(thisDetector->dynamicRange);
|
||||||
activate(-1);
|
activate(-1);
|
||||||
|
//std::cout << "***********************************dataStreaming:" << parentDet->enableDataStreamingFromReceiver(-1) << endl << endl;
|
||||||
|
//parentDet->enableDataStreamingFromReceiver(parentDet->enableDataStreamingFromReceiver(-1));
|
||||||
//set scan tag
|
//set scan tag
|
||||||
setUDPConnection();
|
setUDPConnection();
|
||||||
if(thisDetector->myDetectorType == EIGER)
|
if(thisDetector->myDetectorType == EIGER)
|
||||||
@ -7381,64 +7383,6 @@ int slsDetector::resetFramesCaught(){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int* slsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){
|
|
||||||
int fnum=F_READ_RECEIVER_FRAME;
|
|
||||||
int nel=thisDetector->dataBytes/sizeof(int);
|
|
||||||
int* retval=new int[nel];
|
|
||||||
int ret=FAIL;
|
|
||||||
int n;
|
|
||||||
char mess[MAX_STR_LENGTH]="Nothing";
|
|
||||||
|
|
||||||
if (setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG) {
|
|
||||||
#ifdef VERBOSE
|
|
||||||
std::cout<< "slsDetector: Reading frame from receiver "<< thisDetector->dataBytes << " " <<nel <<std::endl;
|
|
||||||
#endif
|
|
||||||
if (connectData() == OK){
|
|
||||||
dataSocket->SendDataOnly(&fnum,sizeof(fnum));
|
|
||||||
dataSocket->ReceiveDataOnly(&ret,sizeof(ret));
|
|
||||||
|
|
||||||
if (ret==FAIL) {
|
|
||||||
n= dataSocket->ReceiveDataOnly(mess,sizeof(mess));
|
|
||||||
std::cout<< "Detector returned: " << mess << " " << n << std::endl;
|
|
||||||
delete [] retval;
|
|
||||||
disconnectData();
|
|
||||||
return NULL;
|
|
||||||
} else {
|
|
||||||
n=dataSocket->ReceiveDataOnly(fName,MAX_STR_LENGTH);
|
|
||||||
n=dataSocket->ReceiveDataOnly(&acquisitionIndex,sizeof(acquisitionIndex));
|
|
||||||
n=dataSocket->ReceiveDataOnly(&frameIndex,sizeof(frameIndex));
|
|
||||||
if(thisDetector->myDetectorType == EIGER)
|
|
||||||
n=dataSocket->ReceiveDataOnly(&subFrameIndex,sizeof(subFrameIndex));
|
|
||||||
n=dataSocket->ReceiveDataOnly(retval,thisDetector->dataBytes);
|
|
||||||
|
|
||||||
#ifdef VERBOSE
|
|
||||||
std::cout<< "Received "<< n << " data bytes" << std::endl;
|
|
||||||
#endif
|
|
||||||
if (n!=thisDetector->dataBytes) {
|
|
||||||
std::cout<<endl<< "wrong data size received: received " << n << " but expected from receiver " << thisDetector->dataBytes << std::endl;
|
|
||||||
ret=FAIL;
|
|
||||||
delete [] retval;
|
|
||||||
disconnectData();
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
//jungfrau masking adcval
|
|
||||||
if(thisDetector->myDetectorType == JUNGFRAU){
|
|
||||||
for(unsigned int i=0;i<nel;i++){
|
|
||||||
retval[i] = (retval[i] & 0x3FFF3FFF);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
disconnectData();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return retval;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int slsDetector::lockReceiver(int lock){
|
int slsDetector::lockReceiver(int lock){
|
||||||
int fnum=F_LOCK_RECEIVER;
|
int fnum=F_LOCK_RECEIVER;
|
||||||
@ -7726,18 +7670,18 @@ int64_t slsDetector::clearAllErrorMask(){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int slsDetector::setReadReceiverFrequency(int getFromReceiver,int i){
|
int slsDetector::setReadReceiverFrequency(int getFromReceiver, int freq){
|
||||||
int fnum=F_READ_RECEIVER_FREQUENCY;
|
int fnum=F_READ_RECEIVER_FREQUENCY;
|
||||||
int ret = FAIL;
|
int ret = FAIL;
|
||||||
int retval=-1;
|
int retval=-1;
|
||||||
int arg = i;
|
int arg = freq;
|
||||||
|
|
||||||
if(!getFromReceiver)
|
if(!getFromReceiver)
|
||||||
return retval;
|
return retval;
|
||||||
|
|
||||||
if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
|
if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
std::cout << "Sending read frequency to receiver " << arg << std::endl;
|
std::cout << "Sending read frequency to receiver " << arg << std::endl;
|
||||||
#endif
|
#endif
|
||||||
if (connectData() == OK)
|
if (connectData() == OK)
|
||||||
ret=thisReceiver->sendInt(fnum,retval,arg);
|
ret=thisReceiver->sendInt(fnum,retval,arg);
|
||||||
@ -7748,14 +7692,44 @@ int slsDetector::setReadReceiverFrequency(int getFromReceiver,int i){
|
|||||||
updateReceiver();
|
updateReceiver();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((i > 0) && (retval != i)){
|
if ((freq > 0) && (retval != freq)){
|
||||||
cout << "could not set receiver read frequency:" << retval << endl;
|
cout << "could not set receiver read frequency to " << freq <<" Returned:" << retval << endl;
|
||||||
setErrorMask((getErrorMask())|(RECEIVER_READ_FREQUENCY));
|
setErrorMask((getErrorMask())|(RECEIVER_READ_FREQUENCY));
|
||||||
}
|
}
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int slsDetector::enableDataStreamingFromReceiver(int enable){
|
||||||
|
int fnum=F_STREAM_DATA_FROM_RECEIVER;
|
||||||
|
int ret = FAIL;
|
||||||
|
int retval=-1;
|
||||||
|
int arg = enable;
|
||||||
|
|
||||||
|
|
||||||
|
if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
|
||||||
|
#ifdef VERBOSE
|
||||||
|
std::cout << "***************Sending Data Streaming in Receiver " << arg << std::endl;
|
||||||
|
#endif
|
||||||
|
if (connectData() == OK)
|
||||||
|
ret=thisReceiver->sendInt(fnum,retval,arg);
|
||||||
|
disconnectData();
|
||||||
|
if(ret==FAIL)
|
||||||
|
retval = -1;
|
||||||
|
if(ret==FORCE_UPDATE)
|
||||||
|
updateReceiver();
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((enable > 0) && (retval != enable)){
|
||||||
|
cout << "could not set data streaming in receiver to " << enable <<" Returned:" << retval << endl;
|
||||||
|
setErrorMask((getErrorMask())|(DATA_STREAMING));
|
||||||
|
}
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int slsDetector::enableReceiverCompression(int i){
|
int slsDetector::enableReceiverCompression(int i){
|
||||||
int fnum=F_ENABLE_RECEIVER_COMPRESSION;
|
int fnum=F_ENABLE_RECEIVER_COMPRESSION;
|
||||||
int ret = FAIL;
|
int ret = FAIL;
|
||||||
|
@ -1574,14 +1574,16 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
|
|||||||
int resetFramesCaught();
|
int resetFramesCaught();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads a frame from receiver
|
* Create Receiving Data Threads
|
||||||
* @param fName file name of current frame()
|
* @param destroy is true to destroy all the threads
|
||||||
* @param acquisitionIndex current acquisition index
|
* @return OK or FAIL
|
||||||
* @param frameIndex current frame index (for each scan)
|
*/
|
||||||
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
|
int createReceivingDataThreads(bool destroy = false){};
|
||||||
/returns a frame read from recever
|
|
||||||
|
|
||||||
|
/** Reads frames from receiver through a constant socket
|
||||||
*/
|
*/
|
||||||
int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex);
|
void readFrameFromReceiver(){};
|
||||||
|
|
||||||
/** Locks/Unlocks the connection to the receiver
|
/** Locks/Unlocks the connection to the receiver
|
||||||
/param lock sets (1), usets (0), gets (-1) the lock
|
/param lock sets (1), usets (0), gets (-1) the lock
|
||||||
@ -1682,14 +1684,20 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
|
|||||||
char* setDetectorNetworkParameter(networkParameter index, int delay);
|
char* setDetectorNetworkParameter(networkParameter index, int delay);
|
||||||
|
|
||||||
/** Sets the read receiver frequency
|
/** Sets the read receiver frequency
|
||||||
if Receiver read upon gui request, readRxrFrequency=0,
|
if data required from receiver randomly readRxrFrequency=0,
|
||||||
else every nth frame to be sent to gui
|
else every nth frame to be sent to gui
|
||||||
@param getFromReceiver is 1 if it should ask the receiver,
|
@param getFromReceiver is 1 if it should ask the receiver,
|
||||||
0 if it can get it from multislsdetecter
|
0 if it can get it from multi structure
|
||||||
@param i is the receiver read frequency
|
@param freq is the receiver read frequency
|
||||||
/returns read receiver frequency
|
/returns read receiver frequency
|
||||||
*/
|
*/
|
||||||
int setReadReceiverFrequency(int getFromReceiver, int i=-1);
|
int setReadReceiverFrequency(int getFromReceiver, int freq=-1);
|
||||||
|
|
||||||
|
/** Enable or disable streaming data from receiver to client
|
||||||
|
* @param enable 0 to disable 1 to enable -1 to only get the value
|
||||||
|
* @returns data streaming
|
||||||
|
*/
|
||||||
|
int enableDataStreamingFromReceiver(int enable=-1);
|
||||||
|
|
||||||
/** enable/disable or get data compression in receiver
|
/** enable/disable or get data compression in receiver
|
||||||
* @param i is -1 to get, 0 to disable and 1 to enable
|
* @param i is -1 to get, 0 to disable and 1 to enable
|
||||||
|
@ -506,26 +506,20 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef
|
|||||||
virtual int lockReceiver(int lock=-1)=0;
|
virtual int lockReceiver(int lock=-1)=0;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/** Reads frames from receiver through a constant socket
|
||||||
* Reads a frame from receiver
|
|
||||||
* @param fName file name of current frame()
|
|
||||||
* @param acquisitionIndex current acquisition index
|
|
||||||
* @param frameIndex current frame index (for each scan)
|
|
||||||
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
|
|
||||||
/returns a frame read from recever
|
|
||||||
*/
|
*/
|
||||||
virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0;
|
virtual void readFrameFromReceiver()=0;
|
||||||
|
|
||||||
|
|
||||||
/** Sets the read receiver frequency
|
/** Sets the read receiver frequency
|
||||||
if Receiver read upon gui request, readRxrFrequency=0,
|
if data required from receiver randomly readRxrFrequency=0,
|
||||||
else every nth frame to be sent to gui
|
else every nth frame to be sent to gui
|
||||||
@param getFromReceiver is 1 if it should ask the receiver,
|
@param getFromReceiver is 1 if it should ask the receiver,
|
||||||
0 if it can get it from multislsdetecter
|
0 if it can get it from multi structure
|
||||||
@param i is the receiver read frequency
|
@param freq is the receiver read frequency
|
||||||
/returns read receiver frequency
|
/returns read receiver frequency
|
||||||
*/
|
*/
|
||||||
virtual int setReadReceiverFrequency(int getFromReceiver, int i=-1)=0;
|
virtual int setReadReceiverFrequency(int getFromReceiver, int freq=-1)=0;
|
||||||
|
|
||||||
/** Sets the receiver to start any readout remaining in the fifo and
|
/** Sets the receiver to start any readout remaining in the fifo and
|
||||||
* change status to transmitting.
|
* change status to transmitting.
|
||||||
|
@ -4502,7 +4502,8 @@ string slsDetectorCommand::cmdReceiver(int narg, char *args[], int action) {
|
|||||||
if(!strcasecmp(args[1],"start"))
|
if(!strcasecmp(args[1],"start"))
|
||||||
myDet->startReceiver();
|
myDet->startReceiver();
|
||||||
else if(!strcasecmp(args[1],"stop")){
|
else if(!strcasecmp(args[1],"stop")){
|
||||||
myDet->startReceiverReadout();
|
//myDet->stopReceiver();
|
||||||
|
// myDet->startReceiverReadout();
|
||||||
/*runStatus s = myDet->getReceiverStatus();
|
/*runStatus s = myDet->getReceiverStatus();
|
||||||
while(s != RUN_FINISHED){
|
while(s != RUN_FINISHED){
|
||||||
usleep(50000);
|
usleep(50000);
|
||||||
|
@ -240,7 +240,9 @@ int slsDetectorUsers::setReceiverMode(int n){
|
|||||||
return myDetector->setReadReceiverFrequency(1,n);
|
return myDetector->setReadReceiverFrequency(1,n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int slsDetectorUsers::enableDataStreamingFromReceiver(int i){
|
||||||
|
return myDetector->enableDataStreamingFromReceiver(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int64_t slsDetectorUsers::getModuleFirmwareVersion(){
|
int64_t slsDetectorUsers::getModuleFirmwareVersion(){
|
||||||
|
@ -450,6 +450,15 @@ class slsDetectorUsers
|
|||||||
|
|
||||||
virtual void finalizeDataset(double *a, double *v, double *e, int &np);
|
virtual void finalizeDataset(double *a, double *v, double *e, int &np);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
Enable data streaming from receiver (zmq)
|
||||||
|
\param i 1 to set, 0 to reset and -1 to get
|
||||||
|
\returns data streaming enable
|
||||||
|
*/
|
||||||
|
int enableDataStreamingFromReceiver(int i=-1);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
get get Module Firmware Version
|
get get Module Firmware Version
|
||||||
\returns id
|
\returns id
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <sys/ipc.h>
|
#include <sys/ipc.h>
|
||||||
#include <sys/shm.h>
|
#include <sys/shm.h>
|
||||||
|
#include <time.h> //clock()
|
||||||
using namespace std;
|
using namespace std;
|
||||||
slsDetectorUtils::slsDetectorUtils() {
|
slsDetectorUtils::slsDetectorUtils() {
|
||||||
|
|
||||||
@ -42,6 +42,8 @@ slsDetectorUtils::slsDetectorUtils() {
|
|||||||
|
|
||||||
|
|
||||||
int slsDetectorUtils::acquire(int delflag){
|
int slsDetectorUtils::acquire(int delflag){
|
||||||
|
struct timespec begin,end;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &begin);
|
||||||
|
|
||||||
//ensure acquire isnt started multiple times by same client
|
//ensure acquire isnt started multiple times by same client
|
||||||
if(getAcquiringFlag() == false)
|
if(getAcquiringFlag() == false)
|
||||||
@ -58,11 +60,15 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
}else{
|
}else{
|
||||||
//put receiver read frequency to random if no gui
|
//put receiver read frequency to random if no gui
|
||||||
int ret = setReadReceiverFrequency(0);
|
int ret = setReadReceiverFrequency(0);
|
||||||
if(ret>0 && (acquisition_finished == NULL)){
|
if(ret>0 && (dataReady == NULL)){
|
||||||
std::cout << "Error: receiver read frequency is set to " << ret << " but should be > 0 only when using gui." << std::endl;
|
|
||||||
ret = setReadReceiverFrequency(1,0);
|
ret = setReadReceiverFrequency(1,0);
|
||||||
std::cout << "Current receiver read frequency: " << ret << std::endl;
|
std::cout << "No Data call back and hence receiver read frequency reset to " << ret <<" (Random)" << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//start/stop data streaming threads if threads in client enabled/disabled
|
||||||
|
ret = enableDataStreamingFromReceiver(-1);
|
||||||
|
// cout<<"getting datastream:"<<ret<<endl;
|
||||||
|
// cout<<"result of enabledatastream:"<<enableDataStreamingFromReceiver(ret)<<endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
int nc=setTimer(CYCLES_NUMBER,-1);
|
int nc=setTimer(CYCLES_NUMBER,-1);
|
||||||
@ -72,10 +78,6 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
|
|
||||||
int multiframe = nc*nf;
|
int multiframe = nc*nf;
|
||||||
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
acquiringDone = 0;
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
|
|
||||||
// setTotalProgress();
|
// setTotalProgress();
|
||||||
//moved these 2 here for measurement change
|
//moved these 2 here for measurement change
|
||||||
progressIndex=0;
|
progressIndex=0;
|
||||||
@ -148,11 +150,10 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
stopReceiver();
|
stopReceiver();
|
||||||
if(setReceiverOnline()==OFFLINE_FLAG)
|
if(setReceiverOnline()==OFFLINE_FLAG)
|
||||||
*stoppedFlag=1;
|
*stoppedFlag=1;
|
||||||
|
|
||||||
//multi detectors shouldnt have different receiver read frequencies enabled/disabled
|
//multi detectors shouldnt have different receiver read frequencies enabled/disabled
|
||||||
if(setReadReceiverFrequency(0) < 0){
|
if(setReadReceiverFrequency(0) < 0){
|
||||||
std::cout << "Error: The receiver read frequency is invalid:" << setReadReceiverFrequency(0) << std::endl;
|
std::cout << "Error: The receiver read frequency is invalid:" << setReadReceiverFrequency(0) << std::endl;
|
||||||
*stoppedFlag=1;
|
*stoppedFlag=1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(setReceiverOnline()==OFFLINE_FLAG)
|
if(setReceiverOnline()==OFFLINE_FLAG)
|
||||||
@ -160,22 +161,20 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (*threadedProcessing)
|
||||||
if (*threadedProcessing) {
|
startThread(delflag);
|
||||||
sem_init(&sem_queue,0,0);
|
|
||||||
startThread(delflag);
|
|
||||||
}
|
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
cout << " starting thread " << endl;
|
cout << " starting thread " << endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
//resets frames caught in receiver
|
//resets frames caught in receiver
|
||||||
if(receiver){
|
if(receiver){
|
||||||
pthread_mutex_lock(&mg);
|
pthread_mutex_lock(&mg);
|
||||||
resetFramesCaught();
|
resetFramesCaught();
|
||||||
pthread_mutex_unlock(&mg);
|
pthread_mutex_unlock(&mg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for(int im=0;im<nm;im++) {
|
for(int im=0;im<nm;im++) {
|
||||||
|
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
@ -281,7 +280,7 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
|
|
||||||
|
|
||||||
if(receiver){
|
if(receiver){
|
||||||
pthread_mutex_unlock(&mg);//unlock previous
|
pthread_mutex_unlock(&mg);
|
||||||
pthread_mutex_lock(&mp);
|
pthread_mutex_lock(&mp);
|
||||||
createFileName();
|
createFileName();
|
||||||
pthread_mutex_unlock(&mp);
|
pthread_mutex_unlock(&mp);
|
||||||
@ -341,55 +340,21 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
||||||
|
pthread_mutex_lock(&mg);
|
||||||
//offline
|
//offline
|
||||||
pthread_mutex_lock(&mg);
|
pthread_mutex_lock(&mg);
|
||||||
if(setReceiverOnline()==OFFLINE_FLAG){
|
if(setReceiverOnline()==OFFLINE_FLAG){
|
||||||
// wait until data processing thread has finished the data
|
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){
|
||||||
acquiringDone = 1;
|
if((*correctionMask)&(1<<WRITE_FILE))
|
||||||
pthread_mutex_unlock(&mg);
|
closeDataFile();
|
||||||
if (*threadedProcessing) {
|
|
||||||
sem_wait(&sem_queue);
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
acquiringDone = 0;
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef VERBOSE
|
|
||||||
cout << "check data queue size " << endl;
|
|
||||||
#endif
|
|
||||||
/*while (dataQueueSize()){
|
|
||||||
//#ifdef VERBOSE
|
|
||||||
cout << "AAAAAAAAA check data queue size " << endl;
|
|
||||||
//#endif
|
|
||||||
usleep(100000);
|
|
||||||
}*/
|
|
||||||
|
|
||||||
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){
|
|
||||||
if((*correctionMask)&(1<<WRITE_FILE))
|
|
||||||
closeDataFile();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
//online
|
//online
|
||||||
else{
|
else{
|
||||||
acquiringDone = 1;
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
|
|
||||||
// wait until data processing thread has taken the last frame
|
|
||||||
if (*threadedProcessing) {
|
|
||||||
sem_wait(&sem_queue);
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
acquiringDone = 0;
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
}
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
stopReceiver();
|
stopReceiver();
|
||||||
pthread_mutex_unlock(&mg);
|
// cout<<"***********receiver stopped"<<endl;
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&mg);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -507,7 +472,6 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
#endif
|
#endif
|
||||||
setJoinThread(1);
|
setJoinThread(1);
|
||||||
pthread_join(dataProcessingThread, &status);
|
pthread_join(dataProcessingThread, &status);
|
||||||
sem_destroy(&sem_queue);
|
|
||||||
#ifdef VERBOSE
|
#ifdef VERBOSE
|
||||||
cout << "data processing thread joined" << endl;
|
cout << "data processing thread joined" << endl;
|
||||||
#endif
|
#endif
|
||||||
@ -538,6 +502,10 @@ int slsDetectorUtils::acquire(int delflag){
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
setAcquiringFlag(false);
|
setAcquiringFlag(false);
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_REALTIME, &end);
|
||||||
|
cout << "Elapsed time for acquisition:" << (( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0) << " seconds" << endl;
|
||||||
|
|
||||||
return OK;
|
return OK;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ extern "C" {
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
|
#include <semaphore.h>
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
|
||||||
@ -647,16 +647,17 @@ virtual int getReceiverCurrentFrameIndex()=0;
|
|||||||
virtual int resetFramesCaught()=0;
|
virtual int resetFramesCaught()=0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads a frame from receiver
|
* Create Receiving Data Threads
|
||||||
* @param fName file name of current frame()
|
* @param destroy is true to destroy all the threads
|
||||||
* @param acquisitionIndex current acquisition index
|
* @return OK or FAIL
|
||||||
* @param frameIndex current frame index (for each scan)
|
*/
|
||||||
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
|
virtual int createReceivingDataThreads(bool destroy = false)=0;
|
||||||
/returns a frame read from recever
|
|
||||||
*/
|
|
||||||
virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0;
|
|
||||||
|
|
||||||
|
|
||||||
|
/** Reads frames from receiver through a constant socket
|
||||||
|
*/
|
||||||
|
virtual void readFrameFromReceiver()=0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Turns off the receiver server!
|
Turns off the receiver server!
|
||||||
*/
|
*/
|
||||||
@ -699,14 +700,20 @@ virtual int setROI(int n=-1,ROI roiLimits[]=NULL)=0;
|
|||||||
virtual ROI* getROI(int &n)=0;
|
virtual ROI* getROI(int &n)=0;
|
||||||
|
|
||||||
/** Sets the read receiver frequency
|
/** Sets the read receiver frequency
|
||||||
if Receiver read upon gui request, readRxrFrequency=0,
|
if data required from receiver randomly readRxrFrequency=0,
|
||||||
else every nth frame to be sent to gui
|
else every nth frame to be sent to gui
|
||||||
@param getFromReceiver is 1 if it should ask the receiver,
|
@param getFromReceiver is 1 if it should ask the receiver,
|
||||||
0 if it can get it from multislsdetecter
|
0 if it can get it from multi structure
|
||||||
@param i is the receiver read frequency
|
@param freq is the receiver read frequency
|
||||||
/returns read receiver frequency
|
/returns read receiver frequency
|
||||||
*/
|
*/
|
||||||
virtual int setReadReceiverFrequency(int getFromReceiver, int i=-1)=0;
|
virtual int setReadReceiverFrequency(int getFromReceiver, int freq=-1)=0;
|
||||||
|
|
||||||
|
/** Enable or disable streaming of data from receiver to client
|
||||||
|
* @param enable 0 to disable 1 to enable -1 to only get the value
|
||||||
|
* @returns data streaming
|
||||||
|
*/
|
||||||
|
virtual int enableDataStreamingFromReceiver(int enable=-1)=0;
|
||||||
|
|
||||||
|
|
||||||
/** enable/disable or get data compression in receiver
|
/** enable/disable or get data compression in receiver
|
||||||
@ -830,7 +837,6 @@ virtual int setReceiverFifoDepth(int i = -1)=0;
|
|||||||
|
|
||||||
//protected:
|
//protected:
|
||||||
int *stoppedFlag;
|
int *stoppedFlag;
|
||||||
|
|
||||||
int64_t *timerValue;
|
int64_t *timerValue;
|
||||||
detectorSettings *currentSettings;
|
detectorSettings *currentSettings;
|
||||||
int *currentThresholdEV;
|
int *currentThresholdEV;
|
||||||
@ -856,6 +862,8 @@ virtual int setReceiverFifoDepth(int i = -1)=0;
|
|||||||
int (*progress_call)(double,void*);
|
int (*progress_call)(double,void*);
|
||||||
void *pProgressCallArg;
|
void *pProgressCallArg;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
#include "usersFunctions.h"
|
#include "usersFunctions.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
//#define VERBOSE
|
//#define VERBOSE
|
||||||
|
|
||||||
static void* startProcessData(void *n){
|
static void* startProcessData(void *n){
|
||||||
@ -461,12 +462,7 @@ void* postProcessing::processData(int delflag) {
|
|||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
/** IF detector acquisition is done, let the acquire() thread know to finish up and force join thread */
|
|
||||||
if(acquiringDone){
|
|
||||||
sem_post(&sem_queue);
|
|
||||||
// cout << "Sem posted" << endl;
|
|
||||||
} //else
|
|
||||||
// cout << "Sem not posted" << endl;
|
|
||||||
/* IF THERE ARE NO DATA look if acquisition is finished */
|
/* IF THERE ARE NO DATA look if acquisition is finished */
|
||||||
if (checkJoinThread()) {
|
if (checkJoinThread()) {
|
||||||
if (dataQueueSize()==0) {
|
if (dataQueueSize()==0) {
|
||||||
@ -486,181 +482,43 @@ void* postProcessing::processData(int delflag) {
|
|||||||
}
|
}
|
||||||
//receiver
|
//receiver
|
||||||
else{
|
else{
|
||||||
|
//cprintf(RED,"In post processing threads\n");
|
||||||
|
|
||||||
|
|
||||||
int progress = 0;
|
if(dataReady){
|
||||||
char currentfName[MAX_STR_LENGTH]="";
|
readFrameFromReceiver();
|
||||||
int caught = -1;
|
}
|
||||||
int currentAcquisitionIndex = -1;
|
|
||||||
int currentFrameIndex = -1;
|
|
||||||
int currentSubFrameIndex = -1;
|
|
||||||
bool newData = false;
|
|
||||||
int nthframe = setReadReceiverFrequency(0);
|
|
||||||
int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
|
|
||||||
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
|
|
||||||
|
|
||||||
#ifdef VERBOSE
|
//only update progress
|
||||||
std::cout << "receiver read freq:" << nthframe << std::endl;
|
else{
|
||||||
#endif
|
int caught = -1;
|
||||||
|
while(true){
|
||||||
|
cout.flush();
|
||||||
|
cout<<flush;
|
||||||
|
usleep(20000); //20ms need this else connecting error to receiver (too fast)
|
||||||
|
|
||||||
|
//get progress
|
||||||
//repeat forever until joined by the calling thread
|
if(setReceiverOnline() == ONLINE_FLAG){
|
||||||
while(1){
|
|
||||||
|
|
||||||
cout.flush();
|
|
||||||
cout<<flush;
|
|
||||||
usleep(20000); //20ms need this else connecting error to receiver (too fast)
|
|
||||||
|
|
||||||
//get progress
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
if(setReceiverOnline() == ONLINE_FLAG)
|
|
||||||
caught = getFramesCaughtByReceiver();
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
//updating progress
|
|
||||||
if(caught!= -1){
|
|
||||||
setCurrentProgress(caught);
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "caught:" << caught << endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//detector acquistion done, wait for all frames received
|
|
||||||
if(acquiringDone > 0){
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
if(acquiringDone == 1) cout << "acquiring seems to be done" << endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
//IF GUI, check for last frames (counter upto 5)
|
|
||||||
if(dataReady){
|
|
||||||
pthread_mutex_lock(&mg);
|
pthread_mutex_lock(&mg);
|
||||||
acquiringDone++;
|
caught = getFramesCaughtByReceiver();
|
||||||
pthread_mutex_unlock(&mg);
|
pthread_mutex_unlock(&mg);
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "acquiringDone :" << acquiringDone << endl;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
//updating progress
|
||||||
|
if(caught!= -1){
|
||||||
//post to stopReceiver in acquire(), but continue reading frames
|
setCurrentProgress(caught);
|
||||||
if (!dataReady || (acquiringDone >= 5)){
|
#ifdef VERY_VERY_DEBUG
|
||||||
if(!dataReady || (!nthframe) ||(!newData)){
|
cout << "caught:" << caught << endl;
|
||||||
#ifdef VERY_VERY_DEBUG
|
#endif
|
||||||
cout << "gonna post for it to end" << endl;
|
|
||||||
#endif
|
|
||||||
sem_post(&sem_queue);
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "Sem posted" << endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
//random reads and for nthframe, checks if there is no new data
|
|
||||||
else if((!nthframe) ||(!newData)){
|
|
||||||
//cout <<"cecking now" << endl;
|
|
||||||
if (checkJoinThread()){
|
if (checkJoinThread()){
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//for random reads, ask only if it has new data
|
|
||||||
if(!newData){
|
|
||||||
if(caught > progress){
|
|
||||||
newData = true;
|
|
||||||
|
|
||||||
// If new data and acquiringDone>0 (= det acq over), reset to get more frames
|
|
||||||
if(dataReady && (acquiringDone > 0)){
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
acquiringDone = 1;
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "Keeping acquiringDone at 1 " << endl;
|
|
||||||
#endif
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if(newData){
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "new data" << endl;
|
|
||||||
#endif
|
|
||||||
//no gui
|
|
||||||
if (!dataReady){
|
|
||||||
progress = caught;
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "progress:" << progress << endl;
|
|
||||||
#endif
|
|
||||||
newData = false;
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "newData set to false" << endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
//gui
|
|
||||||
else{
|
|
||||||
pthread_mutex_lock(&mg);
|
|
||||||
if(setReceiverOnline()==ONLINE_FLAG){
|
|
||||||
//get data
|
|
||||||
strcpy(currentfName,"");
|
|
||||||
//int* receiverData = new int [getTotalNumberOfChannels()];
|
|
||||||
int* receiverData = readFrameFromReceiver(currentfName,currentAcquisitionIndex,currentFrameIndex,currentSubFrameIndex);
|
|
||||||
|
|
||||||
//if detector returned null
|
|
||||||
if(setReceiverOnline()==OFFLINE_FLAG)
|
|
||||||
receiverData = NULL;
|
|
||||||
|
|
||||||
//no data or wrong data for print out
|
|
||||||
if(receiverData == NULL){
|
|
||||||
currentAcquisitionIndex = -1;
|
|
||||||
cout<<"****Detector Data returned is NULL***"<<endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
// garbage frame
|
|
||||||
if(currentAcquisitionIndex < 0){
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout<<"****Detector returned mismatched indices/garbage or acquisition is over. Trying again.***"<<endl;
|
|
||||||
#endif
|
|
||||||
if(receiverData)
|
|
||||||
delete [] receiverData;
|
|
||||||
}
|
|
||||||
//not garbage frame
|
|
||||||
else{// if (currentAcquisitionIndex > progress){
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "GOT data" << endl;
|
|
||||||
#endif
|
|
||||||
fdata = decodeData(receiverData);
|
|
||||||
delete [] receiverData;
|
|
||||||
if ((fdata) && (dataReady)){
|
|
||||||
// cout << "DATAREADY 3" << endl;
|
|
||||||
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentfName,nx,ny);
|
|
||||||
dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);
|
|
||||||
delete thisData;
|
|
||||||
fdata = NULL;
|
|
||||||
progress = caught;
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "progress:" << progress << endl;
|
|
||||||
#endif
|
|
||||||
newData = false;
|
|
||||||
#ifdef VERY_VERY_DEBUG
|
|
||||||
cout << "newData set to false" << endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&mg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//cout<<"exiting from proccessing thread"<<endl;
|
||||||
|
|
||||||
|
//cprintf(RED,"Exiting post processing thread\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -300,11 +300,7 @@ s
|
|||||||
/** data queue size */
|
/** data queue size */
|
||||||
int queuesize;
|
int queuesize;
|
||||||
|
|
||||||
/** queue mutex */
|
|
||||||
sem_t sem_queue;
|
|
||||||
|
|
||||||
/** set when detector finishes acquiring */
|
|
||||||
int acquiringDone;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -333,18 +329,22 @@ s
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
|
||||||
double *fdata;
|
double *fdata;
|
||||||
|
|
||||||
int (*dataReady)(detectorData*,int, int,void*);
|
int (*dataReady)(detectorData*,int, int,void*);
|
||||||
void *pCallbackArg;
|
void *pCallbackArg;
|
||||||
|
detectorData *thisData;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// double *fdata;
|
||||||
|
// int (*dataReady)(detectorData*,int, int,void*);
|
||||||
|
// void *pCallbackArg;
|
||||||
|
|
||||||
int (*rawDataReady)(double*,int,void*);
|
int (*rawDataReady)(double*,int,void*);
|
||||||
void *pRawDataArg;
|
void *pRawDataArg;
|
||||||
|
|
||||||
|
|
||||||
postProcessingFuncs *ppFun;
|
postProcessingFuncs *ppFun;
|
||||||
detectorData *thisData;
|
//detectorData *thisData;
|
||||||
|
|
||||||
|
|
||||||
double *ang;
|
double *ang;
|
||||||
@ -374,4 +374,5 @@ s
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -18,6 +18,8 @@ add_executable(sls_detector_get
|
|||||||
target_link_libraries(sls_detector_get
|
target_link_libraries(sls_detector_get
|
||||||
slsDetectorShared
|
slsDetectorShared
|
||||||
pthread
|
pthread
|
||||||
|
zmq
|
||||||
|
rt
|
||||||
)
|
)
|
||||||
set_target_properties(sls_detector_get PROPERTIES
|
set_target_properties(sls_detector_get PROPERTIES
|
||||||
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
||||||
@ -30,6 +32,8 @@ add_executable(sls_detector_put
|
|||||||
target_link_libraries(sls_detector_put
|
target_link_libraries(sls_detector_put
|
||||||
slsDetectorShared
|
slsDetectorShared
|
||||||
pthread
|
pthread
|
||||||
|
zmq
|
||||||
|
rt
|
||||||
)
|
)
|
||||||
set_target_properties(sls_detector_put PROPERTIES
|
set_target_properties(sls_detector_put PROPERTIES
|
||||||
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
||||||
@ -42,6 +46,8 @@ add_executable(sls_detector_acquire
|
|||||||
target_link_libraries(sls_detector_acquire
|
target_link_libraries(sls_detector_acquire
|
||||||
slsDetectorShared
|
slsDetectorShared
|
||||||
pthread
|
pthread
|
||||||
|
zmq
|
||||||
|
rt
|
||||||
)
|
)
|
||||||
set_target_properties(sls_detector_acquire PROPERTIES
|
set_target_properties(sls_detector_acquire PROPERTIES
|
||||||
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
||||||
@ -54,6 +60,8 @@ add_executable(sls_detector_help
|
|||||||
target_link_libraries(sls_detector_help
|
target_link_libraries(sls_detector_help
|
||||||
slsDetectorShared
|
slsDetectorShared
|
||||||
pthread
|
pthread
|
||||||
|
zmq
|
||||||
|
rt
|
||||||
)
|
)
|
||||||
set_target_properties(sls_detector_help PROPERTIES
|
set_target_properties(sls_detector_help PROPERTIES
|
||||||
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
|
||||||
|
@ -129,6 +129,24 @@ int receiverInterface::sendIntArray(int fnum, int64_t &retval, int64_t arg[2]){
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int receiverInterface::sendIntArray(int fnum, int &retval, int arg[2]){
|
||||||
|
int args[2];
|
||||||
|
int ret = slsDetectorDefs::FAIL;
|
||||||
|
char mess[100] = "";
|
||||||
|
dataSocket->SendDataOnly(&fnum,sizeof(fnum));
|
||||||
|
dataSocket->SendDataOnly(arg,sizeof(args));
|
||||||
|
dataSocket->ReceiveDataOnly(&ret,sizeof(ret));
|
||||||
|
if (ret==slsDetectorDefs::FAIL){
|
||||||
|
dataSocket->ReceiveDataOnly(mess,sizeof(mess));
|
||||||
|
std::cout<< "Receiver returned error: " << mess << std::endl;
|
||||||
|
}
|
||||||
|
dataSocket->ReceiveDataOnly(&retval,sizeof(retval));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int receiverInterface::getInt(int fnum, int64_t &retval){
|
int receiverInterface::getInt(int fnum, int64_t &retval){
|
||||||
int ret = slsDetectorDefs::FAIL;
|
int ret = slsDetectorDefs::FAIL;
|
||||||
|
|
||||||
|
@ -95,6 +95,16 @@ public:
|
|||||||
*/
|
*/
|
||||||
int sendIntArray(int fnum, int64_t &retval, int64_t arg[2]);
|
int sendIntArray(int fnum, int64_t &retval, int64_t arg[2]);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send an integer to receiver
|
||||||
|
* @param fnum function enum to determine what parameter
|
||||||
|
* @param retval return value
|
||||||
|
* @param arg values to send
|
||||||
|
* \returns success of operation
|
||||||
|
*/
|
||||||
|
int sendIntArray(int fnum, int &retval, int arg[2]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get an integer value from receiver
|
* Get an integer value from receiver
|
||||||
* @param fnum function enum to determine what parameter
|
* @param fnum function enum to determine what parameter
|
||||||
|
@ -16,6 +16,18 @@ using namespace std;
|
|||||||
|
|
||||||
class slsDetector;
|
class slsDetector;
|
||||||
|
|
||||||
|
template<typename _Ret, typename _Class>
|
||||||
|
class func00_t{
|
||||||
|
public:
|
||||||
|
func00_t(_Ret (_Class::*fn)(),_Class* ptr):
|
||||||
|
m_fn(fn),m_ptr(ptr){}
|
||||||
|
~func00_t() {}
|
||||||
|
void operator()() const {((m_ptr->*m_fn)());}
|
||||||
|
private:
|
||||||
|
_Class* m_ptr;
|
||||||
|
_Ret (_Class::*m_fn)();
|
||||||
|
};
|
||||||
|
|
||||||
template<typename _Ret, typename _Class, typename _Store>
|
template<typename _Ret, typename _Class, typename _Store>
|
||||||
class func0_t{
|
class func0_t{
|
||||||
public:
|
public:
|
||||||
@ -94,27 +106,32 @@ private:
|
|||||||
class Task: public virtual slsDetectorDefs{
|
class Task: public virtual slsDetectorDefs{
|
||||||
public:
|
public:
|
||||||
/* Return: int, Param: int */
|
/* Return: int, Param: int */
|
||||||
Task(func1_t <int,slsDetector,int,int>* t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func1_t <int,slsDetector,int,int>* t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: int, Param: string,int */
|
/* Return: int, Param: string,int */
|
||||||
Task(func2_t <int,slsDetector,string,int,int>* t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func2_t <int,slsDetector,string,int,int>* t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: string, Param: string */
|
/* Return: string, Param: string */
|
||||||
Task(func1_t <string,slsDetector,string,string>* t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func1_t <string,slsDetector,string,string>* t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: char*, Param: char* */
|
/* Return: char*, Param: char* */
|
||||||
Task(func1_t <char*,slsDetector,char*,string>* t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func1_t <char*,slsDetector,char*,string>* t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: detectorSettings, Param: int */
|
/* Return: detectorSettings, Param: int */
|
||||||
Task(func1_t <detectorSettings,slsDetector,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func1_t <detectorSettings,slsDetector,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: detectorSettings, Param: detectorSettings,int */
|
/* Return: detectorSettings, Param: detectorSettings,int */
|
||||||
Task(func2_t <detectorSettings,slsDetector,detectorSettings,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0),m11(0){};
|
Task(func2_t <detectorSettings,slsDetector,detectorSettings,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: int, Param: int,int */
|
/* Return: int, Param: int,int */
|
||||||
Task(func2_t <int,slsDetector,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0),m11(0){};
|
Task(func2_t <int,slsDetector,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: int, Param: int,int */
|
/* Return: int, Param: int,int */
|
||||||
Task(func3_t <int,slsDetector,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0),m11(0){};
|
Task(func3_t <int,slsDetector,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0),m11(0),m12(0){};
|
||||||
/* Return: int, Param: trimMode,int,int,int */
|
/* Return: int, Param: trimMode,int,int,int */
|
||||||
Task(func4_t <int,slsDetector,trimMode,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0),m11(0){};
|
Task(func4_t <int,slsDetector,trimMode,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0),m11(0),m12(0){};
|
||||||
/* Return: int, Param: int */
|
/* Return: int, Param: none */
|
||||||
Task(func0_t <int,slsDetector,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t),m11(0){};
|
Task(func0_t <int,slsDetector,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t),m11(0),m12(0){};
|
||||||
/* Return: char*, Param: networkParameter,string,string */
|
/* Return: char*, Param: networkParameter,string,string */
|
||||||
Task(func2_t <char*,slsDetector,networkParameter,string,string>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(t){};
|
Task(func2_t <char*,slsDetector,networkParameter,string,string>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(t),m12(0){};
|
||||||
|
/* Return: void, Param: none */
|
||||||
|
Task(func00_t <void,slsDetector>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(t){};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
~Task(){}
|
~Task(){}
|
||||||
|
|
||||||
void operator()(){
|
void operator()(){
|
||||||
@ -129,6 +146,7 @@ public:
|
|||||||
else if(m9) (*m9)();
|
else if(m9) (*m9)();
|
||||||
else if(m10) (*m10)();
|
else if(m10) (*m10)();
|
||||||
else if(m11) (*m11)();
|
else if(m11) (*m11)();
|
||||||
|
else if(m12) (*m12)();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -154,6 +172,8 @@ private:
|
|||||||
func0_t <int,slsDetector,int>* m10;
|
func0_t <int,slsDetector,int>* m10;
|
||||||
/* Return: char*, Param: networkParameter,string,string */
|
/* Return: char*, Param: networkParameter,string,string */
|
||||||
func2_t <char*,slsDetector,networkParameter,string,string>* m11;
|
func2_t <char*,slsDetector,networkParameter,string,string>* m11;
|
||||||
|
/* Return: void, Param: none */
|
||||||
|
func00_t <void,slsDetector>* m12;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,8 +7,10 @@ ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){
|
|||||||
#endif
|
#endif
|
||||||
m_tasks_loaded = false;
|
m_tasks_loaded = false;
|
||||||
thread_started = false;
|
thread_started = false;
|
||||||
|
zmqthreadpool = false;
|
||||||
current_thread_number = -1;
|
current_thread_number = -1;
|
||||||
number_of_ongoing_tasks = 0;
|
number_of_ongoing_tasks = 0;
|
||||||
|
number_of_total_tasks = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPool::~ThreadPool(){
|
ThreadPool::~ThreadPool(){
|
||||||
@ -34,6 +36,7 @@ int ThreadPool::initialize_threadpool(){
|
|||||||
m_pool_state = STARTED;
|
m_pool_state = STARTED;
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
sem_init(&semStart,1,0);
|
sem_init(&semStart,1,0);
|
||||||
|
sem_init(&semDone,1,0);
|
||||||
for (int i = 0; i < m_pool_size; i++) {
|
for (int i = 0; i < m_pool_size; i++) {
|
||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
thread_started = false;
|
thread_started = false;
|
||||||
@ -68,12 +71,15 @@ int ThreadPool::destroy_threadpool(){
|
|||||||
for (int i = 0; i < m_pool_size; i++) {
|
for (int i = 0; i < m_pool_size; i++) {
|
||||||
void* result;
|
void* result;
|
||||||
sem_post(&semStart);
|
sem_post(&semStart);
|
||||||
|
sem_post(&semDone);
|
||||||
ret = pthread_join(m_threads[i], &result);
|
ret = pthread_join(m_threads[i], &result);
|
||||||
/*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/
|
/*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/
|
||||||
m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting
|
m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting
|
||||||
}
|
}
|
||||||
sem_destroy(&semStart);
|
sem_destroy(&semStart);
|
||||||
|
sem_destroy(&semDone);
|
||||||
number_of_ongoing_tasks = 0;
|
number_of_ongoing_tasks = 0;
|
||||||
|
number_of_total_tasks = 0;
|
||||||
/* cout << m_pool_size << " threads exited from the thread pool" << endl;*/
|
/* cout << m_pool_size << " threads exited from the thread pool" << endl;*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -109,25 +115,28 @@ void* ThreadPool::execute_thread(){
|
|||||||
/*cout << "Unlocking: " << pthread_self() << endl;*/
|
/*cout << "Unlocking: " << pthread_self() << endl;*/
|
||||||
m_task_mutex.unlock();
|
m_task_mutex.unlock();
|
||||||
|
|
||||||
|
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore start address wait:"<<&semStart<<endl;
|
||||||
sem_wait(&semStart);
|
sem_wait(&semStart);
|
||||||
|
|
||||||
|
//cout<<"***"<<ithread<<" checking out semaphore done address:"<<&semDone<<endl;
|
||||||
|
|
||||||
/*cout << ithread <<" Executing thread " << pthread_self() << endl;*/
|
/*cout << ithread <<" Executing thread " << pthread_self() << endl;*/
|
||||||
// execute the task
|
// execute the task
|
||||||
(*task)(); // could also do task->run(arg);
|
(*task)(); // could also do task->run(arg);
|
||||||
/*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/
|
/*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/
|
||||||
|
|
||||||
m_all_tasks_mutex.lock();
|
m_task_mutex.lock();
|
||||||
number_of_ongoing_tasks--;
|
number_of_ongoing_tasks--;
|
||||||
m_all_tasks_mutex.unlock();
|
m_task_mutex.unlock();
|
||||||
|
|
||||||
|
//if(zmqthreadpool) cout<<ithread <<" task done: "<<number_of_ongoing_tasks<<endl;
|
||||||
//last task and check m_tasks_loaded to ensure done only once
|
//last task and check m_tasks_loaded to ensure done only once
|
||||||
if((!number_of_ongoing_tasks) && m_tasks_loaded){
|
if((!number_of_ongoing_tasks) && m_tasks_loaded){
|
||||||
/*cout << ithread << " all tasks done."<<endl;*/
|
//if(zmqthreadpool) cout << ithread << " all tasks done."<<endl;
|
||||||
m_all_tasks_mutex.lock();
|
|
||||||
m_tasks_loaded = false;
|
m_tasks_loaded = false;
|
||||||
m_all_tasks_cond_var.signal();// wake up thread that is waiting for all tasks to be complete
|
|
||||||
m_all_tasks_mutex.unlock();
|
|
||||||
}
|
}
|
||||||
|
//if(zmqthreadpool) cout<<"***"<<ithread<<" semaphore done address post:"<<&semDone<<endl;
|
||||||
|
sem_post(&semDone);
|
||||||
delete task;
|
delete task;
|
||||||
/*cout << ithread << " task deleted" << endl;*/
|
/*cout << ithread << " task deleted" << endl;*/
|
||||||
}
|
}
|
||||||
@ -143,28 +152,37 @@ int ThreadPool::add_task(Task* task){
|
|||||||
// TODO: put a limit on how many tasks can be added at most
|
// TODO: put a limit on how many tasks can be added at most
|
||||||
m_tasks.push_back(task);
|
m_tasks.push_back(task);
|
||||||
number_of_ongoing_tasks++;
|
number_of_ongoing_tasks++;
|
||||||
|
number_of_total_tasks++;
|
||||||
m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available
|
m_task_cond_var.signal(); // wake up one thread that is waiting for a task to be available
|
||||||
m_task_mutex.unlock();
|
m_task_mutex.unlock();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadPool::startExecuting(){
|
||||||
|
if(m_pool_size == 1)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/*cout << "waiting for tasks: locked. gonna wait" << endl;*/
|
||||||
|
m_tasks_loaded = true;
|
||||||
|
|
||||||
|
//giving all threads permission to start as all tasks have been added
|
||||||
|
//if(zmqthreadpool) cout<<"*** semaphore start address post:"<<&semStart<<endl;
|
||||||
|
for(int i=0;i<number_of_total_tasks;i++)
|
||||||
|
sem_post(&semStart);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void ThreadPool::wait_for_tasks_to_complete(){
|
void ThreadPool::wait_for_tasks_to_complete(){
|
||||||
if(m_pool_size == 1)
|
if(m_pool_size == 1)
|
||||||
return;
|
return;
|
||||||
m_all_tasks_mutex.lock();
|
//if(zmqthreadpool) cout<<"waiting for all tasks to be done "<<endl;
|
||||||
/*cout << "waiting for tasks: locked. gonna wait" << endl;*/
|
//if(zmqthreadpool) cout<<"*** semaphore done address wait:"<<&semDone<<endl;
|
||||||
m_tasks_loaded = true;
|
for(int i=0;i<number_of_total_tasks;i++)
|
||||||
|
sem_wait(&semDone);
|
||||||
//giving all threads permission to start as all tasks have been added
|
number_of_total_tasks = 0;
|
||||||
//using a different variable as number_of_ongoing_tasks is likely to change during the loop
|
//if(zmqthreadpool) cout<<"complete"<<endl<<endl;
|
||||||
int totalnumtasks = number_of_ongoing_tasks;
|
|
||||||
for(int i=0;i<totalnumtasks;i++)
|
|
||||||
sem_post(&semStart);
|
|
||||||
|
|
||||||
while ((m_pool_state != STOPPED) && m_tasks_loaded) {
|
|
||||||
m_all_tasks_cond_var.wait(m_all_tasks_mutex.get_mutex_ptr());
|
|
||||||
}
|
|
||||||
/*cout << "waiting for tasks:totall out, must be locked again" << endl;*/
|
|
||||||
m_all_tasks_mutex.unlock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadPool::setzeromqThread(){
|
||||||
|
zmqthreadpool = true;
|
||||||
|
}
|
||||||
|
@ -26,7 +26,9 @@ public:
|
|||||||
int destroy_threadpool();
|
int destroy_threadpool();
|
||||||
void* execute_thread();
|
void* execute_thread();
|
||||||
int add_task(Task* task);
|
int add_task(Task* task);
|
||||||
|
void startExecuting();
|
||||||
void wait_for_tasks_to_complete();
|
void wait_for_tasks_to_complete();
|
||||||
|
void setzeromqThread();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int m_pool_size;
|
int m_pool_size;
|
||||||
@ -36,16 +38,16 @@ private:
|
|||||||
std::deque<Task*> m_tasks;
|
std::deque<Task*> m_tasks;
|
||||||
volatile int m_pool_state;
|
volatile int m_pool_state;
|
||||||
|
|
||||||
Mutex m_all_tasks_mutex;
|
|
||||||
CondVar m_all_tasks_cond_var;
|
|
||||||
bool m_tasks_loaded;
|
bool m_tasks_loaded;
|
||||||
bool thread_started;
|
bool thread_started;
|
||||||
int current_thread_number;
|
int current_thread_number;
|
||||||
|
|
||||||
//volatile uint64_t tasks_done_mask;
|
//volatile uint64_t tasks_done_mask;
|
||||||
volatile int number_of_ongoing_tasks;
|
volatile int number_of_ongoing_tasks;
|
||||||
|
volatile int number_of_total_tasks;
|
||||||
|
|
||||||
sem_t semStart;
|
sem_t semStart;
|
||||||
|
sem_t semDone;
|
||||||
|
bool zmqthreadpool;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user