diff --git a/slsReceiverSoftware/Makefile b/slsReceiverSoftware/Makefile index 632609d1c..097c283f9 100644 --- a/slsReceiverSoftware/Makefile +++ b/slsReceiverSoftware/Makefile @@ -16,6 +16,9 @@ DFLAGS= -g -DDACS_INT -DSLS_RECEIVER_UDP_FUNCTIONS #-DVERBOSE INCLUDES?= $(INCLUDESRXR) -I include/ -I ../slsDetectorCalibration +############################################################## +# ZMQ specific: in this case, you need ZMQ libraries already included in this package +########################################################### LIBZMQDIR = include LIBZMQ = -L$(LIBZMQDIR) -Wl,-rpath=$(LIBZMQDIR) -lzmq diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index 0a9f5a5e1..007b15cbc 100644 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -14,6 +14,7 @@ class GeneralData; class Fifo; class File; +class DataStreamer; #include @@ -24,15 +25,14 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { * Constructor * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataProcessors * @param f address of Fifo pointer - * @param s pointer to receiver status - * @param m pointer to mutex for status * @param ftype pointer to file format type * @param fwenable pointer to file writer enable + * @param dsEnable pointer to data stream enable * @param cbaction pointer to call back action * @param dataReadycb pointer to data ready call back function * @param pDataReadycb pointer to arguments of data ready call back function */ - DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m, fileFormat* ftype, bool* fwenable, + DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, int* cbaction, void (*dataReadycb)(int, char*, int, FILE*, char*, void*), void *pDataReadycb); @@ -57,6 +57,10 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { */ static uint64_t GetRunningMask(); + /** + * Reset RunningMask + */ + static void ResetRunningMask(); //*** non static functions *** //*** getters *** @@ -124,6 +128,13 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { */ void SetGeneralData(GeneralData* g); + /** + * Set thread priority + * @priority priority + * @returns OK or FAIL + */ + int SetThreadPriority(int priority); + /** * Set File Format * @param f file format @@ -235,8 +246,8 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { /** Fifo structure */ Fifo* fifo; - - // individual members + /** Data Stream Enable */ + bool* dataStreamEnable; /** Aquisition Started flag */ bool acquisitionStartedFlag; @@ -244,12 +255,6 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { /** Measurement Started flag */ bool measurementStartedFlag; - /** Receiver Status */ - runStatus* status; - - /** Status mutex */ - pthread_mutex_t* statusMutex; - /**Number of complete frames caught for an entire acquisition (including all scans) */ uint64_t numTotalFramesCaught; diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index b8a748771..e8150f1c5 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -9,14 +9,23 @@ #include "ThreadObject.h" +class GeneralData; +class Fifo; +class DataStreamer; +class ZmqSocket; + class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { public: /** * Constructor * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataStreamers + * @param f address of Fifo pointer + * @param dr pointer to dynamic range + * @param freq poiner to streaming frequency + * @param timer poiner to timer if streaming frequency is random */ - DataStreamer(); + DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer); /** * Destructor @@ -25,18 +34,30 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { ~DataStreamer(); + //*** static functions *** /** * Get RunningMask * @return RunningMask */ static uint64_t GetErrorMask(); + /** + * Get RunningMask + * @return RunningMask + */ + static uint64_t GetRunningMask(); + /** * Reset RunningMask */ static void ResetRunningMask(); + //*** non static functions *** + //*** getters *** + + + //*** setters *** /** * Set bit in RunningMask to allow thread to run */ @@ -47,6 +68,52 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ void StopRunning(); + /** + * Set Fifo pointer to the one given + * @param f address of Fifo pointer + */ + void SetFifo(Fifo*& f); + + /** + * Reset parameters for new acquisition (including all scans) + */ + void ResetParametersforNewAcquisition(); + + /** + * Reset parameters for new measurement (eg. for each scan) + */ + void ResetParametersforNewMeasurement(); + + /** + * Create Part1 of Json Header which includes common attributes in an acquisition + */ + void CreateHeaderPart1(); + + /** + * Set GeneralData pointer to the one given + * @param g address of GeneralData (Detector Data) pointer + */ + void SetGeneralData(GeneralData* g); + + /** + * Set thread priority + * @priority priority + * @returns OK or FAIL + */ + int SetThreadPriority(int priority); + + /** + * Creates Zmq Sockets + * @param dindex pointer to detector index + * @param nunits pointer to number of theads/ units per detector + * @return OK or FAIL + */ + int CreateZmqSockets(int* dindex, int* nunits); + + /** + * Shuts down and deletes Zmq Sockets + */ + void CloseZmqSocket(); private: @@ -62,13 +129,52 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ bool IsRunning(); + /** + * Record First Indices (firstAcquisitionIndex, firstMeasurementIndex) + * @param fnum frame index to record + */ + void RecordFirstIndices(uint64_t fnum); + /** * Thread Exeution for DataStreamer Class * Stream an image via zmq */ void ThreadExecution(); + /** + * Frees dummy buffer, + * reset running mask by calling StopRunning() + * @param buf address of pointer + */ + void StopProcessing(char* buf); + /** + * Process an image popped from fifo, + * write to file if fw enabled & update parameters + * @param buffer + */ + void ProcessAnImage(char* buf); + + /** + * This function should be called only in random frequency mode + * Checks if timer is done and ready to send data + * @returns true if ready to send data, else false + */ + bool CheckTimer(); + + /** + * This function should be called only in non random frequency mode + * Checks if count is done and ready to send data + * @returns true if ready to send data, else false + */ + bool CheckCount(); + + /** + * Create and send Json Header + * @param fnum frame number + * @returns 0 if error, else 1 + */ + int SendHeader(uint64_t fnum); /** type of thread */ static const std::string TypeName; @@ -84,5 +190,50 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** mutex to update static items among objects (threads)*/ static pthread_mutex_t Mutex; + + /** Json Header Format for each measurement part */ + static const char *jsonHeaderFormat_part1; + + /** Json Header Format */ + static const char *jsonHeaderFormat; + + /** GeneralData (Detector Data) object */ + const GeneralData* generalData; + + /** Fifo structure */ + Fifo* fifo; + + /** ZMQ Socket - Receiver to Client */ + ZmqSocket* zmqSocket; + + /** Pointer to dynamic range */ + uint32_t* dynamicRange; + + /** Pointer to Streaming frequency, if 0, sending random images with a timer */ + uint32_t* streamingFrequency; + + /** Pointer to the timer if Streaming frequency is random */ + uint32_t* streamingTimerInMs; + + /** Current frequency count */ + uint32_t currentFreqCount; + + /** timer beginning stamp for random streaming */ + struct timespec timerBegin; + + /** Current Json Header prefix*/ + char* currentHeader; + + /** Aquisition Started flag */ + bool acquisitionStartedFlag; + + /** Measurement Started flag */ + bool measurementStartedFlag; + + /** Frame Number of First Frame of an entire Acquisition (including all scans) */ + uint64_t firstAcquisitionIndex; + + /** Frame Number of First Frame for each real time acquisition (eg. for each scan) */ + uint64_t firstMeasurementIndex; }; diff --git a/slsReceiverSoftware/include/Fifo.h b/slsReceiverSoftware/include/Fifo.h index de627472e..e345d2e96 100644 --- a/slsReceiverSoftware/include/Fifo.h +++ b/slsReceiverSoftware/include/Fifo.h @@ -31,16 +31,16 @@ class Fifo : private virtual slsReceiverDefs { */ ~Fifo(); - /** - * Pops free address from fifoFree - */ - void GetNewAddress(char*& address); - /** * Frees the bound address by pushing into fifoFree */ void FreeAddress(char*& address); + /** + * Pops free address from fifoFree + */ + void GetNewAddress(char*& address); + /** * Pushes bound address into fifoBound */ @@ -51,6 +51,16 @@ class Fifo : private virtual slsReceiverDefs { */ void PopAddress(char*& address); + /** + * Pushes bound address into fifoStream + */ + void PushAddressToStream(char*& address); + + /** + * Pops bound address from fifoStream to stream data + */ + void PopAddressToStream(char*& address); + private: /** @@ -82,4 +92,6 @@ class Fifo : private virtual slsReceiverDefs { /** Circular Fifo pointing to addresses of freed data in memory */ CircularFifo* fifoFree; + /** Circular Fifo pointing to addresses of to be streamed data in memory */ + CircularFifo* fifoStream; }; diff --git a/slsReceiverSoftware/include/GeneralData.h b/slsReceiverSoftware/include/GeneralData.h index 15b38e7dd..c1cb9ec2f 100644 --- a/slsReceiverSoftware/include/GeneralData.h +++ b/slsReceiverSoftware/include/GeneralData.h @@ -428,7 +428,7 @@ private: packetsPerFrame = 256; imageSize = dataSize*packetsPerFrame; frameIndexMask = 0xffffff; - maxFramesPerFile = 5;//EIGER_MAX_FRAMES_PER_FILE; + maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE; fifoBufferSize = imageSize; fifoBufferHeaderSize= FIFO_HEADER_NUMBYTES + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 100; diff --git a/slsReceiverSoftware/include/HDF5FileStatic.h b/slsReceiverSoftware/include/HDF5FileStatic.h index 245c3d41f..86ac8cd5e 100644 --- a/slsReceiverSoftware/include/HDF5FileStatic.h +++ b/slsReceiverSoftware/include/HDF5FileStatic.h @@ -447,10 +447,10 @@ public: hid_t vdsdataset = H5Dcreate2 (fd, virtualDatasetname.c_str(), dtype, vdsDataspace, H5P_DEFAULT, dcpl, H5P_DEFAULT); if (vdsdataset >= 0){ - H5Sclose(vdsDataspace); - H5Sclose(srcDataspace); - H5Dclose(vdsdataset); - H5Fclose(fd); + H5Sclose(vdsDataspace);vdsDataspace = 0; + H5Sclose(srcDataspace);srcDataspace = 0; + H5Dclose(vdsdataset);vdsdataset = 0; + H5Fclose(fd);fd = 0; return 0; } else cprintf(RED, "could not create virtual dataset in virtual file %s\n", virtualfname.c_str()); @@ -468,6 +468,7 @@ public: } else cprintf(RED, "could not create dfal for virtual file %s\n", virtualfname.c_str()); H5Fclose(fd); + fd = 0; return 1; } diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 68b9e6685..60fcca248 100644 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -25,8 +25,10 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { * @param s pointer to receiver status * @param portno pointer to udp port number * @param e ethernet interface + * @param act pointer to activated + * @param nf pointer to number of images to catch */ - Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e); + Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf); /** * Destructor @@ -49,11 +51,9 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { static uint64_t GetRunningMask(); /** - * Set GeneralData pointer to the one given - * @param g address of GeneralData (Detector Data) pointer + * Reset RunningMask */ - static void SetGeneralData(GeneralData*& g); - + static void ResetRunningMask(); //*** non static functions *** @@ -94,7 +94,6 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { */ void StopRunning(); - /** * Set Fifo pointer to the one given * @param f address of Fifo pointer @@ -111,6 +110,19 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { */ void ResetParametersforNewMeasurement(); + /** + * Set GeneralData pointer to the one given + * @param g address of GeneralData (Detector Data) pointer + */ + void SetGeneralData(GeneralData*& g); + + /** + * Set thread priority + * @priority priority + * @returns OK or FAIL + */ + int SetThreadPriority(int priority); + /** * Creates UDP Sockets * @return OK or FAIL @@ -168,6 +180,13 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { */ uint32_t ListenToAnImage(char* buf); + /** + * Create an image (for deactivated detectors), + * @param buffer + * @returns image size or 0 + */ + uint32_t CreateAnImage(char* buf); + /** type of thread */ @@ -202,7 +221,7 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { /** Receiver Status */ runStatus* status; - /** UDP Sockets - Detector to Receiver */ + /** UDP Socket - Detector to Receiver */ genericSocket* udpSocket; /** UDP Port Number */ @@ -211,6 +230,12 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { /** ethernet interface */ char* eth; + /** if the detector is activated */ + int* activated; + + /** Number of Images to catch */ + uint64_t* numImages; + /**Number of complete Packets caught for an entire acquisition (including all scans) */ uint64_t numTotalPacketsCaught; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 6ff7e4c0f..950daf940 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -206,6 +206,11 @@ private: */ void SetLocalNetworkParameters(); + /** + * Set Thread Priorities + */ + void SetThreadPriorities(); + /** * Set up the Fifo Structure for processing buffers * between listening and dataprocessor threads diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 18593f40d..c344c8eb8 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -62,7 +62,6 @@ class sockaddr_in; #include - using namespace std; #define DEFAULT_PACKET_SIZE 1286 @@ -72,7 +71,7 @@ using namespace std; #define DEFAULT_BACKLOG 5 #define DEFAULT_UDP_PORTNO 50001 #define DEFAULT_GUI_PORTNO 65000 -#define DEFAULT_ZMQ_PORTNO 70001 +//#define DEFAULT_ZMQ_PORTNO 70001 class genericSocket{ @@ -102,13 +101,10 @@ enum communicationProtocol{ { memset(&serverAddress, 0, sizeof(serverAddress)); memset(&clientAddress, 0, sizeof(clientAddress)); - // strcpy(hostname,host_ip_or_name); - strcpy(lastClientIP,"none"); strcpy(thisClientIP,"none1"); strcpy(dummyClientIP,"dummy"); differentClients = 0; - struct hostent *hostInfo = gethostbyname(host_ip_or_name); if (hostInfo == NULL){ cerr << "Exiting: Problem interpreting host: " << host_ip_or_name << "\n"; @@ -170,13 +166,13 @@ enum communicationProtocol{ /* // you can specify an IP address: */ /* // or you can let it automatically select one: */ /* myaddr.sin_addr.s_addr = INADDR_ANY; */ - - strcpy(lastClientIP,"none"); strcpy(thisClientIP,"none1"); strcpy(dummyClientIP,"dummy"); differentClients = 0; + + if(serverAddress.sin_port == htons(port_number)){ socketDescriptor = -10; return; @@ -209,8 +205,7 @@ enum communicationProtocol{ if (string(ip)!=string("0.0.0.0")) { - if (inet_pton(AF_INET, ip, &(serverAddress.sin_addr))) - ; + if (inet_pton(AF_INET, ip, &(serverAddress.sin_addr))); else serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); } @@ -718,7 +713,6 @@ enum communicationProtocol{ int nsent; int total_sent; int header_packet_size; - // pthread_mutex_t mp; }; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 9ce1c35f9..3f72ec253 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -35,3 +35,7 @@ #define DUMMY_PACKET_VALUE 0xFFFFFFFF +#define LISTENER_PRIORITY 99 +#define PROCESSOR_PRIORITY 90 +#define STREAMER_PRIORITY 80 +#define TCP_PRIORITY 50 diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 3ed9355e6..a9a65d764 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -13,8 +13,10 @@ #ifdef HDF5C #include "HDF5File.h" #endif +#include "DataStreamer.h" #include +#include #include using namespace std; @@ -29,17 +31,17 @@ uint64_t DataProcessor::RunningMask(0x0); pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER; -DataProcessor::DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m, fileFormat* ftype, bool* fwenable, +DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, int* cbaction, void (*dataReadycb)(int, char*, int, FILE*, char*, void*), void *pDataReadycb) : + ThreadObject(NumberofDataProcessors), generalData(0), fifo(f), + dataStreamEnable(dsEnable), acquisitionStartedFlag(false), measurementStartedFlag(false), - status(s), - statusMutex(m), numTotalFramesCaught(0), numFramesCaught(0), firstAcquisitionIndex(0), @@ -59,7 +61,7 @@ DataProcessor::DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m, fileFor } NumberofDataProcessors++; - FILE_LOG (logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors << endl; + FILE_LOG (logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors; } @@ -79,6 +81,10 @@ uint64_t DataProcessor::GetRunningMask() { return RunningMask; } +void DataProcessor::ResetRunningMask() { + RunningMask = 0x0; +} + /** non static functions */ /** getters */ string DataProcessor::GetType(){ @@ -126,7 +132,6 @@ void DataProcessor::StopRunning() { pthread_mutex_unlock(&Mutex); } - void DataProcessor::SetFifo(Fifo*& f) { fifo = f; } @@ -142,11 +147,6 @@ void DataProcessor::ResetParametersforNewMeasurement(){ numFramesCaught = 0; firstMeasurementIndex = 0; measurementStartedFlag = false; - if(RunningMask){ - pthread_mutex_lock(&Mutex); - RunningMask = 0x0; - pthread_mutex_unlock(&Mutex); - } } @@ -185,6 +185,15 @@ void DataProcessor::SetGeneralData(GeneralData* g) { } +int DataProcessor::SetThreadPriority(int priority) { + struct sched_param param; + param.sched_priority = priority; + if (pthread_setschedparam(thread, SCHED_RR, ¶m) == EPERM) + return FAIL; + return OK; +} + + void DataProcessor::SetFileFormat(const fileFormat f) { if (file->GetFileType() != f) { //remember the pointer values before they are destroyed @@ -257,14 +266,22 @@ void DataProcessor::ThreadExecution() { ProcessAnImage(buffer + FIFO_HEADER_NUMBYTES); - //free - fifo->FreeAddress(buffer); + //stream or free + if (*dataStreamEnable) + fifo->PushAddressToStream(buffer); + else + fifo->FreeAddress(buffer); } void DataProcessor::StopProcessing(char* buf) { - fifo->FreeAddress(buf); + //stream or free + if (*dataStreamEnable) + fifo->PushAddressToStream(buf); + else + fifo->FreeAddress(buf); + file->CloseCurrentFile(); StopRunning(); cprintf(BLUE,"%d: Processing Completed\n", index); diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index b5304ae88..7b085714f 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -5,8 +5,12 @@ #include "DataStreamer.h" +#include "GeneralData.h" +#include "Fifo.h" +#include "ZmqSocket.h" #include +#include using namespace std; const string DataStreamer::TypeName = "DataStreamer"; @@ -19,12 +23,36 @@ uint64_t DataStreamer::RunningMask(0x0); pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; +const char* DataStreamer::jsonHeaderFormat_part1 = + "{" + "\"htype\":[\"chunk-1.0\"], " + "\"type\":\"%s\", " + "\"shape\":%s, "; -DataStreamer::DataStreamer() : - ThreadObject(NumberofDataStreamers) +const char* DataStreamer::jsonHeaderFormat = + "%s" + "\"acqIndex\":%lld, " + "\"fIndex\":%lld, " + "\"subfnum\":%lld, " + "\"fname\":\"%s\"}"; + + +DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer) : + ThreadObject(NumberofDataStreamers), + generalData(0), + fifo(f), + zmqSocket(0), + dynamicRange(dr), + streamingFrequency(freq), + streamingTimerInMs(timer), + currentFreqCount(0), + currentHeader(0), + acquisitionStartedFlag(false), + measurementStartedFlag(false), + firstAcquisitionIndex(0), + firstMeasurementIndex(0) { - FILE_LOG (logDEBUG) << __AT__ << " called"; - + memset(timerBegin, 0xFF, sizeof(timespec)); if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); ErrorMask ^= (1<nPixelsX, generalData->nPixelsY); + + sprintf(currentHeader, jsonHeaderFormat_part1, type, shape); +} + + +void DataStreamer::RecordFirstIndices(uint64_t fnum) { + measurementStartedFlag = true; + firstMeasurementIndex = fnum; + + //start of entire acquisition + if (!acquisitionStartedFlag) { + acquisitionStartedFlag = true; + firstAcquisitionIndex = fnum; + } + +#ifdef VERBOSE + cprintf(BLUE,"%d First Acquisition Index:%lld\tFirst Measurement Index:%lld\n", + index, (long long int)firstAcquisitionIndex, (long long int)firstMeasurementIndex); +#endif +} + + +void DataStreamer::SetGeneralData(GeneralData* g) { + generalData = g; +#ifdef VERY_VERBOSE + generalData->Print(); +#endif +} + +int DataStreamer::SetThreadPriority(int priority) { + struct sched_param param; + param.sched_priority = priority; + if (pthread_setschedparam(thread, SCHED_RR, ¶m) == EPERM) + return FAIL; + return OK; +} + + +int DataStreamer::CreateZmqSockets(int* dindex, int* nunits) { + uint32_t portnum = DEFAULT_ZMQ_PORTNO + ((*dindex) * (*nunits) + index); + printf("%d Streamer: Port number: %d\n", index, portnum); + + zmqSocket = new ZmqSocket(portnum); + if (zmqSocket->GetErrorStatus()) { + cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index); + return FAIL; + } + printf("%d Streamer: Zmq Server started at %s\n",zmqSocket->GetZmqServerAddress()); + return OK; +} + + +void DataStreamer::CloseZmqSocket() { + if (zmqSocket) { + delete zmqSocket; + zmqSocket = 0; + } +} + + void DataStreamer::ThreadExecution() { - FILE_LOG (logDEBUG) << __AT__ << " called"; + char* buffer=0; + fifo->PopAddressToStream(buffer); +#ifdef FIFODEBUG + if (!index) cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer); +#endif + + //check dummy + uint32_t numBytes = (uint32_t)(*((uint32_t*)buffer)); + if (numBytes == DUMMY_PACKET_VALUE) { + StopProcessing(buffer); + return; + } + + ProcessAnImage(buffer + FIFO_HEADER_NUMBYTES); + + //free + fifo->FreeAddress(buffer); } + +void DataStreamer::StopProcessing(char* buf) { + fifo->FreeAddress(buf); + StopRunning(); + cprintf(MAGENTA,"%d: Streaming Completed\n", index); +} + + +void DataStreamer::ProcessAnImage(char* buf) { + uint64_t fnum = (*((uint64_t*)buf)); +#ifdef VERBOSE + if (!index) cprintf(MAGENTA,"DataStreamer %d: fnum:%lld\n", index, (long long int)fnum); +#endif + + if (!measurementStartedFlag) { +#ifdef VERBOSE + if (!index) cprintf(MAGENTA,"DataStreamer %d: fnum:%lld\n", index, (long long int)fnum); +#endif + RecordFirstIndices(fnum); + //restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + //to send first image + currentFreqCount = *streamingFrequency; + } + + //skip + if (!(*streamingFrequency)) { + if (!CheckTimer()) + return; + } else { + if (!CheckCount()) + return; + } + + if(!SendHeader(fnum)) + cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n", + (long long int) fnum, index); + + + Send Datat(); +} + +bool DataStreamer::CheckTimer() { + struct timespec end; + clock_gettime(CLOCK_REALTIME, &end); +#ifdef VERBOSE + cprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index, ( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0); +#endif + //still less than streaming timer, keep waiting + if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < (streamingTimerInMs/1000)) + return false; + + //restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + return true; +} + +bool DataStreamer::CheckCount() { + if (currentFreqCount == *streamingFrequency ) { + currentFreqCount = 1; + return true; + } + currentFreqCount++; + return false; +} + +int DataStreamer::SendHeader(uint64_t fnum) { + uint64_t frameIndex = fnum - firstMeasurementIndex; + uint64_t acquisitionIndex = fnum - firstAcquisitionIndex; + uint64_t subframeIndex = -1; /* subframe to be included in fifo buffer? */ + char buf[1000]; + int len = sprintf(buf, jsonHeaderFormat, jsonHeaderFormat_part1, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); + return zmqSocket->SendDataOnly(buf, len); +} diff --git a/slsReceiverSoftware/src/Fifo.cpp b/slsReceiverSoftware/src/Fifo.cpp index 1a7f03eb4..10f68386d 100644 --- a/slsReceiverSoftware/src/Fifo.cpp +++ b/slsReceiverSoftware/src/Fifo.cpp @@ -16,7 +16,8 @@ int Fifo::NumberofFifoClassObjects(0); Fifo::Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success): memory(0), fifoBound(0), - fifoFree(0) { + fifoFree(0), + fifoStream(0){ FILE_LOG (logDEBUG) << __AT__ << " called"; index = NumberofFifoClassObjects++; if(CreateFifos(fifoItemSize, fifoDepth) == FAIL) @@ -41,6 +42,7 @@ int Fifo::CreateFifos(uint32_t fifoItemSize, uint32_t fifoDepth) { //create fifos fifoBound = new CircularFifo(fifoDepth); fifoFree = new CircularFifo(fifoDepth); + fifoStream = new CircularFifo(fifoDepth); //allocate memory memory = (char*) calloc (fifoItemSize * fifoDepth, sizeof(char)); if (memory == NULL){ @@ -75,6 +77,10 @@ void Fifo::DestroyFifos(){ delete fifoFree; fifoFree = 0; } + if (fifoStream) { + delete fifoStream; + fifoStream = 0; + } if(memory) { free(memory); memory = 0; @@ -82,15 +88,14 @@ void Fifo::DestroyFifos(){ } +void Fifo::FreeAddress(char*& address) { + while(!fifoFree->push(address)); +} void Fifo::GetNewAddress(char*& address) { fifoFree->pop(address); } -void Fifo::FreeAddress(char*& address) { - while(!fifoFree->push(address)); -} - void Fifo::PushAddress(char*& address) { while(!fifoBound->push(address)); } @@ -99,3 +104,11 @@ void Fifo::PopAddress(char*& address) { fifoBound->pop(address); } +void Fifo::PushAddressToStream(char*& address) { + while(!fifoStream->push(address)); +} + +void Fifo::PopAddressToStream(char*& address) { + fifoStream->pop(address); +} + diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 39ff4b3b1..d05d09c5c 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -12,6 +12,7 @@ #include "genericSocket.h" #include +#include #include using namespace std; @@ -28,7 +29,7 @@ pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER; const GeneralData* Listener::generalData(0); -Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e) : +Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf) : ThreadObject(NumberofListeners), fifo(f), acquisitionStartedFlag(false), @@ -37,6 +38,8 @@ Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e) : udpSocket(0), udpPortNumber(portno), eth(e), + activated(act), + numImages(nf), numTotalPacketsCaught(0), numPacketsCaught(0), firstAcquisitionIndex(0), @@ -53,7 +56,7 @@ Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e) : pthread_mutex_unlock(&Mutex); } NumberofListeners++; - FILE_LOG (logDEBUG) << "Number of Listeners: " << NumberofListeners << endl; + FILE_LOG (logDEBUG) << "Number of Listeners: " << NumberofListeners; } @@ -74,16 +77,11 @@ uint64_t Listener::GetRunningMask() { return RunningMask; } -void Listener::SetGeneralData(GeneralData*& g) { - FILE_LOG (logDEBUG) << __AT__ << " called"; - generalData = g; -#ifdef VERY_VERBOSE - generalData->Print(); -#endif +void Listener::ResetRunningMask() { + RunningMask = 0x0; } - /** non static functions */ /** getters */ string Listener::GetType(){ @@ -150,12 +148,6 @@ void Listener::ResetParametersforNewMeasurement(){ if (listeningPacket) delete listeningPacket; listeningPacket = new char[generalData->packetSize]; - - if(RunningMask){ - pthread_mutex_lock(&Mutex); - RunningMask = 0x0; - pthread_mutex_unlock(&Mutex); - } } @@ -172,17 +164,35 @@ void Listener::RecordFirstIndices(uint64_t fnum) { acquisitionStartedFlag = true; firstAcquisitionIndex = fnum; } - if (!index) cprintf(MAGENTA,"%d First Acquisition Index:%lld\n" + if (!index) cprintf(GREEN,"%d First Acquisition Index:%lld\n" "%d First Measurement Index:%lld\n", index, (long long int)firstAcquisitionIndex, index, (long long int)firstMeasurementIndex); } +void Listener::SetGeneralData(GeneralData*& g) { + generalData = g; +#ifdef VERY_VERBOSE + generalData->Print(); +#endif +} + + +int Listener::SetThreadPriority(int priority) { + struct sched_param param; + param.sched_priority = priority; + if (pthread_setschedparam(thread, SCHED_RR, ¶m) == EPERM) + return FAIL; + return OK; +} int Listener::CreateUDPSockets() { ShutDownUDPSocket(); + if (!(*activated)) + return OK; + //if eth is mistaken with ip address if (strchr(eth,'.') != NULL){ strcpy(eth,""); @@ -226,7 +236,7 @@ void Listener::ThreadExecution() { #endif //udpsocket doesnt exist - if (!udpSocket) { + if (*activated && !udpSocket) { FILE_LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not created or shut down earlier"; (*((uint32_t*)buffer)) = 0; StopListening(buffer); @@ -234,17 +244,25 @@ void Listener::ThreadExecution() { } //get data - if (*status != TRANSMITTING) - rc = ListenToAnImage(buffer + generalData->fifoBufferHeaderSize); + if (*status != TRANSMITTING) { + if (*activated) + rc = ListenToAnImage(buffer + generalData->fifoBufferHeaderSize); + else + rc = CreateAnImage(buffer + generalData->fifoBufferHeaderSize); + } //done acquiring - if (*status == TRANSMITTING) { + if (*status == TRANSMITTING || ((!(*activated)) && (rc == 0))) { StopListening(buffer); return; } //error check - if (rc <= 0) cprintf(BG_RED,"Error:(Weird), UDP Sockets not shut down, but received nothing\n"); + if (rc <= 0) { + cprintf(BG_RED,"Error:(Weird), UDP Sockets not shut down, but received nothing\n"); + fifo->FreeAddress(buffer); + return; + } (*((uint32_t*)buffer)) = rc; (*((uint64_t*)(buffer + FIFO_HEADER_NUMBYTES ))) = currentFrameIndex; @@ -311,22 +329,34 @@ uint32_t Listener::ListenToAnImage(char* buf) { //future packet - if(fnum != currentFrameIndex) { + if (fnum != currentFrameIndex) { carryOverFlag = true; memcpy(carryOverPacket,listeningPacket, generalData->packetSize); return generalData->imageSize; } - //copy packet and update fnum + //copy packet memcpy(buf + (pnum * dsize), listeningPacket + generalData->headerSizeinPacket, dsize); - (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum; - } return generalData->imageSize; } +uint32_t Listener::CreateAnImage(char* buf) { + if (!measurementStartedFlag) + RecordFirstIndices(0); + if (currentFrameIndex == *numImages) + return 0; + //update parameters + numPacketsCaught++; //record immediately to get more time before socket shutdown + numTotalPacketsCaught++; + + //reset data to -1 + memset(buf, 0xFF, generalData->dataSize); + + return generalData->imageSize; +} diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index a01c5e1aa..e1798517a 100644 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -80,7 +80,7 @@ void ThreadObject::RunningThread() { ThreadExecution(); - }/*--end of inner loop */ + }//end of inner loop //wait till the next acquisition @@ -91,7 +91,7 @@ void ThreadObject::RunningThread() { pthread_exit(NULL); } - }/*--end of loop for each acquisition (outer loop) */ + }//end of outer loop } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index ddd1b3a33..3c5572609 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -13,27 +13,24 @@ #include //system #include //strcpy +#include //eperm using namespace std; /** cosntructor & destructor */ UDPStandardImplementation::UDPStandardImplementation() { - InitializeMembers(); } UDPStandardImplementation::~UDPStandardImplementation() { - DeleteMembers(); } void UDPStandardImplementation::DeleteMembers() { - - if (generalData) { delete generalData; generalData=0;} for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) delete(*it); @@ -51,8 +48,6 @@ void UDPStandardImplementation::DeleteMembers() { void UDPStandardImplementation::InitializeMembers() { - - UDPBaseImplementation::initializeMembers(); acquisitionPeriod = SAMPLE_TIME_IN_NS; @@ -73,8 +68,6 @@ void UDPStandardImplementation::InitializeMembers() { /*** Overloaded Functions called by TCP Interface ***/ uint64_t UDPStandardImplementation::getTotalFramesCaught() const { - - uint64_t sum = 0; uint32_t flagsum = 0; @@ -91,8 +84,6 @@ uint64_t UDPStandardImplementation::getTotalFramesCaught() const { } uint64_t UDPStandardImplementation::getFramesCaught() const { - - uint64_t sum = 0; uint32_t flagsum = 0; @@ -108,8 +99,6 @@ uint64_t UDPStandardImplementation::getFramesCaught() const { } int64_t UDPStandardImplementation::getAcquisitionIndex() const { - - uint64_t sum = 0; uint32_t flagsum = 0; @@ -126,8 +115,6 @@ int64_t UDPStandardImplementation::getAcquisitionIndex() const { void UDPStandardImplementation::setFileFormat(const fileFormat f){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - switch(f){ #ifdef HDF5C case HDF5: @@ -147,8 +134,6 @@ void UDPStandardImplementation::setFileFormat(const fileFormat f){ void UDPStandardImplementation::setFileName(const char c[]) { - - if (strlen(c)) { strcpy(fileName, c); //automatically update fileName in Filewriter (pointer) /*int detindex = -1; @@ -169,8 +154,6 @@ void UDPStandardImplementation::setFileName(const char c[]) { int UDPStandardImplementation::setShortFrameEnable(const int i) { - - if (myDetectorType != GOTTHARD) { cprintf(RED, "Error: Can not set short frame for this detector\n"); return FAIL; @@ -193,6 +176,8 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) { (*it)->SetGeneralData(generalData); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->SetGeneralData(generalData); + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) + (*it)->SetGeneralData(generalData); } FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable; return OK; @@ -200,8 +185,6 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) { int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) { - - if (frameToGuiFrequency != freq) { frameToGuiFrequency = freq; @@ -222,8 +205,6 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) { int UDPStandardImplementation::setDataStreamEnable(const bool enable) { - - if (dataStreamEnable != enable) { dataStreamEnable = enable; @@ -233,16 +214,27 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) { dataStreamer.clear(); if (enable) { - for ( int i=0; i < numThreads; ++i ) { - dataStreamer.push_back(new DataStreamer()); - if (DataStreamer::GetErrorMask()) { - cprintf(BG_RED,"Error: Could not create data callback threads\n"); - for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) - delete(*it); - dataStreamer.clear(); - return FAIL; + bool error = false; + for ( int i = 0; i < numThreads; ++i ) { + dataStreamer.push_back(new DataStreamer(fifo[i], &frameToGuiFrequency, &frameToGuiTimerinMS, &dynamicRange)); + dataStreamer[i]->SetGeneralData(generalData); + if (dataStreamer[i]->CreateZmqSockets() == FAIL) { + error = true; + break; } } + if (DataStreamer::GetErrorMask() || error) { + if (DataStreamer::GetErrorMask()) + cprintf(BG_RED,"Error: Could not create data callback threads\n"); + else + cprintf(BG_RED,"Error: Could not create zmq sockets\n"); + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) + delete(*it); + dataStreamer.clear(); + dataStreamEnable = false; + return FAIL; + } + SetThreadPriorities(); } } FILE_LOG (logINFO) << "Data Send to Gui: " << dataStreamEnable; @@ -251,8 +243,6 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) { int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) { - - if (acquisitionPeriod != i) { acquisitionPeriod = i; @@ -273,8 +263,6 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) { int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) { - - if (acquisitionTime != i) { acquisitionTime = i; @@ -295,8 +283,6 @@ int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) { int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) { - - if (numberOfFrames != i) { numberOfFrames = i; @@ -317,8 +303,6 @@ int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) { int UDPStandardImplementation::setDynamicRange(const uint32_t i) { - - if (dynamicRange != i) { dynamicRange = i; @@ -335,8 +319,6 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i) { int UDPStandardImplementation::setTenGigaEnable(const bool b) { - - if (tengigaEnable != b) { tengigaEnable = b; //side effects @@ -352,8 +334,6 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b) { int UDPStandardImplementation::setFifoDepth(const uint32_t i) { - - if (fifoDepth != i) { fifoDepth = i; @@ -368,8 +348,6 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i) { int UDPStandardImplementation::setDetectorType(const detectorType d) { - - FILE_LOG (logDEBUG) << "Setting receiver type"; DeleteMembers(); @@ -415,8 +393,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { //create threads for ( int i=0; i < numThreads; ++i ) { - listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth)); - dataProcessor.push_back(new DataProcessor(fifo[i], &status, &statusMutex, &fileFormatType, &fileWriteEnable, + listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames)); + dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, &callbackAction, rawDataReadyCallBack,pRawDataReady)); if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")"; @@ -436,6 +414,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { (*it)->SetGeneralData(generalData); } + + SetThreadPriorities(); + FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d); return OK; } @@ -444,7 +425,6 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { void UDPStandardImplementation::setDetectorPositionId(const int i){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; detID = i; FILE_LOG(logINFO) << "Detector Position Id:" << detID; @@ -456,17 +436,19 @@ void UDPStandardImplementation::setDetectorPositionId(const int i){ void UDPStandardImplementation::resetAcquisitionCount() { - - for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ResetParametersforNewAcquisition(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->ResetParametersforNewAcquisition(); + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) + (*it)->ResetParametersforNewAcquisition(); + FILE_LOG (logINFO) << "Acquisition Count has been reset"; } + int UDPStandardImplementation::VerifyCallBackAction() { /** file path and file index not required?? or need to include detector index? do they need the datasize? its given for write data anyway */ @@ -496,7 +478,6 @@ int UDPStandardImplementation::VerifyCallBackAction() { } int UDPStandardImplementation::startReceiver(char *c) { - ResetParametersforNewMeasurement(); //listener @@ -541,7 +522,6 @@ int UDPStandardImplementation::startReceiver(char *c) { void UDPStandardImplementation::stopReceiver(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logINFO) << "Stopping Receiver"; //set status to transmitting @@ -554,7 +534,9 @@ void UDPStandardImplementation::stopReceiver(){ while(DataProcessor::GetRunningMask()){ usleep(5000); } - + while(DataStreamer::GetRunningMask()){ + usleep(5000); + } pthread_mutex_lock(&statusMutex); status = RUN_FINISHED; @@ -579,6 +561,8 @@ void UDPStandardImplementation::stopReceiver(){ cprintf(GREEN, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); } } + if(!activated) + cprintf(RED,"Note: Deactivated Receiver\n"); //callback if (acquisitionFinishedCallBack) acquisitionFinishedCallBack((int)(tot/numThreads), pAcquisitionFinished); @@ -597,8 +581,6 @@ void UDPStandardImplementation::stopReceiver(){ void UDPStandardImplementation::startReadout(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - if(status == RUNNING){ //needs to wait for packets only if activated @@ -647,7 +629,6 @@ void UDPStandardImplementation::startReadout(){ void UDPStandardImplementation::shutDownUDPSockets() { - for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ShutDownUDPSocket(); } @@ -655,7 +636,6 @@ void UDPStandardImplementation::shutDownUDPSockets() { void UDPStandardImplementation::closeFiles() { - for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->CloseFiles(); } @@ -663,8 +643,6 @@ void UDPStandardImplementation::closeFiles() { void UDPStandardImplementation::SetLocalNetworkParameters() { - - //to increase socket receiver buffer size and max length of input queue by changing kernel settings if (myDetectorType == EIGER) return; @@ -691,10 +669,46 @@ void UDPStandardImplementation::SetLocalNetworkParameters() { +void UDPStandardImplementation::SetThreadPriorities() { + + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it){ + if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) { + FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option."; + return; + } + } + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){ + if ((*it)->SetThreadPriority(PROCESSOR_PRIORITY) == FAIL) { + FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option."; + return; + } + } + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){ + if ((*it)->SetThreadPriority(STREAMER_PRIORITY) == FAIL) { + FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option."; + return; + } + } + struct sched_param tcp_param; + tcp_param.sched_priority = TCP_PRIORITY; + if (pthread_setschedparam(pthread_self(),5 , &tcp_param) != EPERM) { + FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option."; + return; + } + + ostringstream osfn; + osfn << "Priorities set - " + "TCP:"<< TCP_PRIORITY << + ", Listener:" << LISTENER_PRIORITY << + ", Processor:" << PROCESSOR_PRIORITY; + if (dataStreamEnable) + osfn << ", Streamer:" << STREAMER_PRIORITY; + + FILE_LOG(logINFO) << osfn.str(); +} + + int UDPStandardImplementation::SetupFifoStructure() { - - - //recalculate number of jobs & fifodepth, return if no change if ((myDetectorType == GOTTHARD) || (myDetectorType == PROPIX)) { @@ -747,6 +761,7 @@ int UDPStandardImplementation::SetupFifoStructure() { //set the listener & dataprocessor threads to point to the right fifo if(listener.size())listener[i]->SetFifo(fifo[i]); if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]); + if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i]); } FILE_LOG (logINFO) << "Fifo structure(s) reconstructed"; @@ -756,18 +771,21 @@ int UDPStandardImplementation::SetupFifoStructure() { void UDPStandardImplementation::ResetParametersforNewMeasurement() { + Listener::ResetRunningMask(); + DataProcessor::ResetRunningMask(); + DataStreamer::ResetRunningMask(); for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ResetParametersforNewMeasurement(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->ResetParametersforNewMeasurement(); + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) + (*it)->ResetParametersforNewMeasurement(); } int UDPStandardImplementation::CreateUDPSockets() { - - bool error = false; for (unsigned int i = 0; i < listener.size(); ++i) if (listener[i]->CreateUDPSockets() == FAIL) { @@ -785,7 +803,6 @@ int UDPStandardImplementation::CreateUDPSockets() { int UDPStandardImplementation::SetupWriter() { - bool error = false; for (unsigned int i = 0; i < dataProcessor.size(); ++i) if (dataProcessor[i]->CreateNewFile(tengigaEnable, @@ -805,8 +822,6 @@ int UDPStandardImplementation::SetupWriter() { void UDPStandardImplementation::StartRunning() { - - //set running mask and post semaphore to start the inner loop in execution thread for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) { (*it)->StartRunning(); @@ -816,4 +831,8 @@ void UDPStandardImplementation::StartRunning() { (*it)->StartRunning(); (*it)->Continue(); } + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){ + (*it)->StartRunning(); + (*it)->Continue(); + } }