From 39560969f44191fa94d2043de1100c48e3abced2 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 13 Jul 2017 12:17:49 +0200 Subject: [PATCH] can set zmqport from receiver, ensured proper destructors, and ctrl c should kill it --- slsReceiverSoftware/include/DataProcessor.h | 3 +- slsReceiverSoftware/include/DataStreamer.h | 7 +- slsReceiverSoftware/include/Fifo.h | 3 +- slsReceiverSoftware/include/Listener.h | 3 +- slsReceiverSoftware/include/ThreadObject.h | 3 +- .../include/UDPBaseImplementation.h | 19 +++- slsReceiverSoftware/include/UDPInterface.h | 18 ++-- slsReceiverSoftware/include/slsReceiver.h | 5 - .../include/slsReceiverTCPIPInterface.h | 7 +- .../include/slsReceiverUsers.h | 3 - .../include/sls_receiver_funcs.h | 1 + slsReceiverSoftware/src/DataProcessor.cpp | 6 +- slsReceiverSoftware/src/DataStreamer.cpp | 15 +-- slsReceiverSoftware/src/Fifo.cpp | 5 +- slsReceiverSoftware/src/Listener.cpp | 5 +- slsReceiverSoftware/src/ThreadObject.cpp | 4 +- .../src/UDPBaseImplementation.cpp | 19 +++- .../src/UDPStandardImplementation.cpp | 17 ++-- slsReceiverSoftware/src/main.cpp | 99 +++++++++++-------- slsReceiverSoftware/src/slsReceiver.cpp | 9 +- .../src/slsReceiverTCPIPInterface.cpp | 81 ++++++++++++--- slsReceiverSoftware/src/slsReceiverUsers.cpp | 4 - 22 files changed, 217 insertions(+), 119 deletions(-) diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index e29999896..d485b4f90 100644 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -24,6 +24,7 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { /** * Constructor * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataProcessors + * @param ind self index * @param f address of Fifo pointer * @param ftype pointer to file format type * @param fwenable pointer to file writer enable @@ -31,7 +32,7 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { * @param dataReadycb pointer to data ready call back function * @param pDataReadycb pointer to arguments of data ready call back function */ - DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, + DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, char*, uint32_t, void*), void *pDataReadycb); diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index 94fbb0640..7926d7d7c 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -20,13 +20,14 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** * Constructor * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataStreamers + * @param ind self index * @param f address of Fifo pointer * @param dr pointer to dynamic range * @param freq pointer to streaming frequency * @param timer pointer to timer if streaming frequency is random * @param sEnable pointer to short frame enable */ - DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable); + DataStreamer(int ind, Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable); /** * Destructor @@ -100,11 +101,11 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** * Creates Zmq Sockets - * @param dindex pointer to detector index * @param nunits pointer to number of theads/ units per detector + * @param port streaming port start index * @return OK or FAIL */ - int CreateZmqSockets(int* dindex, int* nunits); + int CreateZmqSockets(int* nunits, uint32_t port); /** * Shuts down and deletes Zmq Sockets diff --git a/slsReceiverSoftware/include/Fifo.h b/slsReceiverSoftware/include/Fifo.h index 92305f58c..846a1ecdb 100644 --- a/slsReceiverSoftware/include/Fifo.h +++ b/slsReceiverSoftware/include/Fifo.h @@ -20,11 +20,12 @@ class Fifo : private virtual slsReceiverDefs { /** * Constructor * Calls CreateFifos that creates fifos and allocates memory + * @param ind self index * @param fifoItemSize size of each fifo item * @param fifoDepth fifo depth * @param success true if successful, else false */ - Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success); + Fifo(int ind, uint32_t fifoItemSize, uint32_t fifoDepth, bool &success); /** * Destructor diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 8a0e57d2c..57d059c05 100644 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -21,6 +21,7 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { /** * Constructor * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofListerners + * @param ind self index * @param dtype detector type * @param f address of Fifo pointer * @param s pointer to receiver status @@ -30,7 +31,7 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { * @param nf pointer to number of images to catch * @param dr pointer to dynamic range */ - Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr); + Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr); /** * Destructor diff --git a/slsReceiverSoftware/include/ThreadObject.h b/slsReceiverSoftware/include/ThreadObject.h index 5b5639cea..29fac7786 100644 --- a/slsReceiverSoftware/include/ThreadObject.h +++ b/slsReceiverSoftware/include/ThreadObject.h @@ -19,9 +19,8 @@ class ThreadObject : private virtual slsReceiverDefs { public: /** * Constructor - * @param ind self index */ - ThreadObject(int ind); + ThreadObject(); /** * Destructor diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index bde8874e9..068a851a5 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -241,6 +241,12 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ int getActivate() const; + /** + * Get Streaming Port + * @return streaming port + */ + uint32_t getStreamingPort() const; + /************************************************************************* @@ -497,11 +503,6 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ void abort(); //FIXME: needed, isn't stopReceiver enough? - /** - * Closes all files - */ - void closeFiles(); - /** * Activate / Deactivate Receiver * If deactivated, receiver will write dummy packets 0xFF @@ -509,6 +510,12 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ int setActivate(int enable = -1); + /** + * Set streaming port + * @param i streaming port + */ + void setStreamingPort(const uint32_t i); + //***callback functions*** /** * Call back for start acquisition @@ -629,6 +636,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** Data Stream Enable from Receiver */ bool dataStreamEnable; static const int DEFAULT_STREAMING_TIMER = 500; + /** streaming port */ + uint32_t streamingPort; //***callback parameters*** diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 81f8a7faa..e11b869b3 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -20,7 +20,6 @@ class UDPInterface { - /* abstract class that defines the UDP interface of an sls detector data receiver. * @@ -302,6 +301,12 @@ class UDPInterface { */ virtual int getActivate() const = 0; + /** + * Get Streaming Port + * @return streaming port + */ + virtual uint32_t getStreamingPort() const = 0; + /************************************************************************* * Setters *************************************************************** @@ -555,11 +560,6 @@ class UDPInterface { */ virtual void abort() = 0; //FIXME: needed, isnt stopReceiver enough? - /** - * Closes all files - */ - virtual void closeFiles() = 0; - /** * Activate / Deactivate Receiver * If deactivated, receiver will write dummy packets 0xFF @@ -567,6 +567,12 @@ class UDPInterface { */ virtual int setActivate(int enable = -1) = 0; + /** + * Set streaming port + * @param i streaming port + */ + virtual void setStreamingPort(const uint32_t i) = 0; + //***callback functions*** /** diff --git a/slsReceiverSoftware/include/slsReceiver.h b/slsReceiverSoftware/include/slsReceiver.h index 25741401d..4f86ce879 100644 --- a/slsReceiverSoftware/include/slsReceiver.h +++ b/slsReceiverSoftware/include/slsReceiver.h @@ -46,11 +46,6 @@ class slsReceiver : private virtual slsReceiverDefs { */ void stop(); - /** - * Close File and exits receiver server - */ - void closeFile(int p); - /** * get get Receiver Version \returns id diff --git a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h index 29f897144..a899c3c03 100644 --- a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h @@ -51,10 +51,6 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { void stop(); - - /** Close all threaded Files and exit */ - void closeFile(int p); - /** gets version */ int64_t getReceiverVersion(); @@ -265,6 +261,9 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { /** set multi detector size */ int set_multi_detector_size(); + /** set streaming port */ + int set_streaming_port(); + /** detector type */ diff --git a/slsReceiverSoftware/include/slsReceiverUsers.h b/slsReceiverSoftware/include/slsReceiverUsers.h index ac8c4a560..c2dd0bc0d 100644 --- a/slsReceiverSoftware/include/slsReceiverUsers.h +++ b/slsReceiverSoftware/include/slsReceiverUsers.h @@ -29,9 +29,6 @@ public: /** Destructor */ ~slsReceiverUsers(); - /** Close File and exits receiver server */ - void closeFile(int p); - /** * starts listening on the TCP port for client comminication \return 0 for success or 1 for FAIL in creating TCP server diff --git a/slsReceiverSoftware/include/sls_receiver_funcs.h b/slsReceiverSoftware/include/sls_receiver_funcs.h index 39bbd20c5..a8574edd2 100644 --- a/slsReceiverSoftware/include/sls_receiver_funcs.h +++ b/slsReceiverSoftware/include/sls_receiver_funcs.h @@ -60,6 +60,7 @@ enum recFuncs{ F_SEND_RECEIVER_DETPOSID, /** < sets the detector position id in the reveiver */ F_SEND_RECEIVER_MULTIDETSIZE, /** < sets the multi detector size to the receiver */ + F_SET_RECEIVER_STREAMING_PORT, /** < sets the receiver streaming port */ /* Always append functions hereafter!!! */ diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 13d83ca4e..8c5d61946 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -31,12 +31,12 @@ uint64_t DataProcessor::RunningMask(0x0); pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER; -DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, +DataProcessor::DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, char*, uint32_t, void*), void *pDataReadycb) : - ThreadObject(NumberofDataProcessors), + ThreadObject(), generalData(0), fifo(f), file(0), @@ -53,6 +53,8 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* rawDataReadyCallBack(dataReadycb), pRawDataReady(pDataReadycb) { + index = ind; + if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); ErrorMask ^= (1<*nunits) - if(index >= *nunits) - portnum = DEFAULT_ZMQ_PORTNO + index; +int DataStreamer::CreateZmqSockets(int* nunits, uint32_t port) { + uint32_t portnum = port + index; zmqSocket = new ZmqSocket(portnum); if (zmqSocket->IsError()) { diff --git a/slsReceiverSoftware/src/Fifo.cpp b/slsReceiverSoftware/src/Fifo.cpp index d95cb6f27..97e35e215 100644 --- a/slsReceiverSoftware/src/Fifo.cpp +++ b/slsReceiverSoftware/src/Fifo.cpp @@ -14,14 +14,15 @@ using namespace std; int Fifo::NumberofFifoClassObjects(0); -Fifo::Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success): +Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t fifoDepth, bool &success): + index(ind), memory(0), fifoBound(0), fifoFree(0), fifoStream(0), status_fifoBound(0){ FILE_LOG (logDEBUG) << __AT__ << " called"; - index = NumberofFifoClassObjects++; + NumberofFifoClassObjects++; if(CreateFifos(fifoItemSize, fifoDepth) == FAIL) success = false; } diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 4fcf54581..dbcaf4f0a 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -27,8 +27,8 @@ uint64_t Listener::RunningMask(0x0); pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER; -Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) : - ThreadObject(NumberofListeners), +Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) : + ThreadObject(), generalData(0), fifo(f), myDetectorType(dtype), @@ -50,6 +50,7 @@ Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, carryOverPacket(0), listeningPacket(0) { + index = ind; if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index 08a7fc4e2..b1a7e5107 100644 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -12,8 +12,8 @@ using namespace std; -ThreadObject::ThreadObject(int ind): - index(ind), +ThreadObject::ThreadObject(): + index(0), alive(false), killThread(false), thread(0) diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 53d0d3adc..c0165911a 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -6,6 +6,7 @@ #include "UDPBaseImplementation.h" #include "genericSocket.h" +#include "ZmqSocket.h" #include // stat #include @@ -77,6 +78,7 @@ void UDPBaseImplementation::initializeMembers(){ frameToGuiFrequency = 0; frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER; dataStreamEnable = false; + streamingPort = 0; } UDPBaseImplementation::~UDPBaseImplementation(){} @@ -203,6 +205,7 @@ slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ FILE_LOG(lo int UDPBaseImplementation::getActivate() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return activated;} +uint32_t UDPBaseImplementation::getStreamingPort() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return streamingPort;} /************************************************************************* * Setters *************************************************************** @@ -523,11 +526,6 @@ void UDPBaseImplementation::abort(){ FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; } -void UDPBaseImplementation::closeFiles(){ - FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; - FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; -} - int UDPBaseImplementation::setActivate(int enable){ FILE_LOG(logDEBUG) << __AT__ << " starting"; @@ -540,6 +538,17 @@ int UDPBaseImplementation::setActivate(int enable){ return activated; } +void UDPBaseImplementation::setStreamingPort(const uint32_t i) { + + if (streamingPort == 0) + streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports + else + streamingPort = i; + + FILE_LOG(logINFO) << "Streaming Port: " << streamingPort; +} + + /***callback functions***/ void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){ startAcquisitionCallBack=func; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index a161f64bf..18d8e11b4 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -10,6 +10,7 @@ #include "DataProcessor.h" #include "DataStreamer.h" #include "Fifo.h" +#include "ZmqSocket.h" //just for the zmq port define #include //system #include //strcpy @@ -182,7 +183,8 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) { } -int UDPStandardImplementation::setDataStreamEnable(const bool enable) { +int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\ + if (dataStreamEnable != enable) { dataStreamEnable = enable; @@ -194,9 +196,12 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) { if (enable) { bool error = false; for ( int i = 0; i < numThreads; ++i ) { - dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, &shortFrameEnable)); + dataStreamer.push_back(new DataStreamer(i, fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, &shortFrameEnable)); dataStreamer[i]->SetGeneralData(generalData); - if (dataStreamer[i]->CreateZmqSockets(&detID, &numThreads) == FAIL) { + // check again + if (streamingPort == 0) + streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports + if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort) == FAIL) { error = true; break; } @@ -370,8 +375,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { //create threads for ( int i=0; i < numThreads; ++i ) { - listener.push_back(new Listener(myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange)); - dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, + listener.push_back(new Listener(i, myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange)); + dataProcessor.push_back(new DataProcessor(i, fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, rawDataReadyCallBack,pRawDataReady)); if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")"; @@ -727,7 +732,7 @@ int UDPStandardImplementation::SetupFifoStructure() { for ( int i = 0; i < numThreads; i++ ) { //create fifo structure bool success = true; - fifo.push_back( new Fifo ( + fifo.push_back( new Fifo (i, (generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize), fifoDepth, success)); if (!success) { diff --git a/slsReceiverSoftware/src/main.cpp b/slsReceiverSoftware/src/main.cpp index d64771832..f8b613436 100644 --- a/slsReceiverSoftware/src/main.cpp +++ b/slsReceiverSoftware/src/main.cpp @@ -6,62 +6,85 @@ #include #include -#include //SIGINT +#include //SIGINT #include //system #include "utilities.h" #include "logger.h" + +#include //wait +#include //wait using namespace std; -slsReceiverUsers *receiver; -void deleteReceiver(slsReceiverUsers* r){ - if(r){delete r;r=0;} -} +bool keeprunning; -void closeFile(int p){ - deleteReceiver(receiver); +void sigInterruptHandler(int p){ + keeprunning = false; } /* -int startAcquisitionCallBack(char* filePath, char* fileName, int fileIndex, int bufferSize, void* context) { - FILE_LOG(logINFO) << "#### startAcquisitionCallBack ####"; - FILE_LOG(logINFO) << "* filePath: " << filePath; - FILE_LOG(logINFO) << "* fileName: " << fileName; - FILE_LOG(logINFO) << "* fileIndex: " << fileIndex; - FILE_LOG(logINFO) << "* bufferSize: " << bufferSize; - return 1; +int StartAcq(char* filepath, char* filename, uint64_t fileindex, uint32_t datasize, void*p){ + printf("#### StartAcq: filepath:%s filename:%s fileindex:%llu datasize:%u ####\n", + filepath, filename, fileindex, datasize); + + cprintf(BLUE, "--StartAcq: returning 0\n"); + return 0; } -void acquisitionFinishedCallBack(int totalFramesCaught, void* context) { - FILE_LOG(logINFO) << "#### acquisitionFinishedCallBack ####"; - FILE_LOG(logINFO) << "* totalFramesCaught: " << totalFramesCaught; + +void AcquisitionFinished(uint64_t frames, void*p){ + cprintf(BLUE, "#### AcquisitionFinished: frames:%llu ####\n",frames); } -void rawDataReadyCallBack(int currFrameNum, char* dataPointer, int dataSize, FILE* file, char* guiDataPointer, void* context) { - FILE_LOG(logINFO) << "#### rawDataReadyCallBack ####"; - FILE_LOG(logINFO) << "* currFrameNum: " << currFrameNum; - FILE_LOG(logINFO) << "* dataSize: " << dataSize; + +void GetData(uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp, + uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber, uint8_t detType, uint8_t version, + char* datapointer, uint32_t datasize, void* p){ + + PRINT_IN_COLOR (xCoord, + "#### %d GetData: ####\n" + "frameNumber: %llu\t\texpLength: %u\t\tpacketNumber: %u\t\tbunchId: %llu\t\ttimestamp: %llu\t\tmodId: %u\t\t" + "xCoord: %u\t\tyCoord: %u\t\tzCoord: %u\t\tdebug: %u\t\troundRNumber: %u\t\tdetType: %u\t\t" + "version: %u\t\tfirstbytedata: 0x%x\t\tdatsize: %u\n\n", + xCoord, frameNumber, expLength, packetNumber, bunchId, timestamp, modId, + xCoord, yCoord, zCoord, debug, roundRNumber, detType, version, + ((uint8_t)(*((uint8_t*)(datapointer)))), datasize); + } */ + int main(int argc, char *argv[]) { - //Catch signal SIGINT to close files properly - signal(SIGINT,closeFile); + keeprunning = true; + + // Catch signal SIGINT to close files and call destructors properly + struct sigaction sa; + sa.sa_flags=0; // no flags + sa.sa_handler=sigInterruptHandler; // handler function + sigemptyset(&sa.sa_mask); // dont block additional signals during invocation of handler + if (sigaction(SIGINT, &sa, NULL) == -1) { + bprintf(RED, "Could not set handler function for SIGINT\n"); + } + // if socket crash, ignores SISPIPE, prevents global signal handler // subsequent read/write to socket gives error - must handle locally - signal(SIGPIPE, SIG_IGN); + struct sigaction asa; + asa.sa_flags=0; // no flags + asa.sa_handler=SIG_IGN; // handler function + sigemptyset(&asa.sa_mask); // dont block additional signals during invocation of handler + if (sigaction(SIGPIPE, &asa, NULL) == -1) { + bprintf(RED, "Could not set handler function for SIGCHILD\n"); + } - //system("setterm -linux term -background white -clear"); int ret = slsReceiverDefs::OK; - receiver = new slsReceiverUsers(argc, argv, ret); - + slsReceiverUsers *receiver = new slsReceiverUsers(argc, argv, ret); if(ret==slsReceiverDefs::FAIL){ - deleteReceiver(receiver); - return -1; + delete receiver; + exit(EXIT_FAILURE); } @@ -107,18 +130,16 @@ int main(int argc, char *argv[]) { //start tcp server thread - if(receiver->start() == slsReceiverDefs::OK){ - FILE_LOG(logDEBUG1) << "DONE!"; - string str; - cin>>str; - //wait and look for an exit keyword - while(str.find("exit") == string::npos) - cin>>str; - //stop tcp server thread, stop udp socket - receiver->stop(); + if (receiver->start() == slsReceiverDefs::FAIL){ + delete receiver; + exit(EXIT_FAILURE); } - deleteReceiver(receiver); + FILE_LOG(logINFO) << "Ready ... "; + bprintf(GRAY, "\n[ Press \'Ctrl+c\' to exit ]\n"); + while(keeprunning); + + delete receiver; FILE_LOG(logINFO) << "Goodbye!"; return 0; } diff --git a/slsReceiverSoftware/src/slsReceiver.cpp b/slsReceiverSoftware/src/slsReceiver.cpp index b5c3bcd57..79336e2d3 100644 --- a/slsReceiverSoftware/src/slsReceiver.cpp +++ b/slsReceiverSoftware/src/slsReceiver.cpp @@ -135,9 +135,7 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success) { slsReceiver::~slsReceiver() { - if(udp_interface) - delete udp_interface; - if(tcpipInterface) + if(tcpipInterface) delete tcpipInterface; } @@ -152,11 +150,6 @@ void slsReceiver::stop() { } -void slsReceiver::closeFile(int p) { - tcpipInterface->closeFile(p); -} - - int64_t slsReceiver::getReceiverVersion(){ return tcpipInterface->getReceiverVersion(); } diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index 18f721f7a..a70e49114 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -25,6 +25,8 @@ slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() { delete mySock; mySock=NULL; } + if(receiverBase) + delete receiverBase; } slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn): @@ -137,7 +139,7 @@ int slsReceiverTCPIPInterface::start(){ void slsReceiverTCPIPInterface::stop(){ - FILE_LOG(logINFO) << "Shutting down UDP Socket"; + FILE_LOG(logINFO) << "Shutting down TCP Socket"; killTCPServerThread = 1; if(mySock) mySock->ShutDownSocket(); FILE_LOG(logDEBUG) << "Socket closed"; @@ -147,11 +149,6 @@ void slsReceiverTCPIPInterface::stop(){ -void slsReceiverTCPIPInterface::closeFile(int p){ - receiverBase->closeFiles(); -} - - int64_t slsReceiverTCPIPInterface::getReceiverVersion(){ int64_t retval = SVNREV; retval= (retval <<32) | SVNDATE; @@ -220,9 +217,6 @@ void slsReceiverTCPIPInterface::startTCPServer(){ FILE_LOG(logINFO) << "Shutting down UDP Socket"; if(receiverBase){ receiverBase->shutDownUDPSockets(); - - FILE_LOG(logINFO) << "Closing Files... "; - receiverBase->closeFiles(); } mySock->exitServer(); @@ -230,8 +224,14 @@ void slsReceiverTCPIPInterface::startTCPServer(){ } //if user entered exit - if(killTCPServerThread) + if(killTCPServerThread) { + if (v != GOODBYE) { + if(receiverBase){ + receiverBase->shutDownUDPSockets(); + } + } pthread_exit(NULL); + } } } @@ -277,6 +277,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) { case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT"; case F_SEND_RECEIVER_DETPOSID: return "F_SEND_RECEIVER_DETPOSID"; case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE"; + case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT"; default: return "Unknown Function"; } } @@ -322,7 +323,7 @@ int slsReceiverTCPIPInterface::function_table(){ flist[F_SET_RECEIVER_FILE_FORMAT] = &slsReceiverTCPIPInterface::set_file_format; flist[F_SEND_RECEIVER_DETPOSID] = &slsReceiverTCPIPInterface::set_detector_posid; flist[F_SEND_RECEIVER_MULTIDETSIZE] = &slsReceiverTCPIPInterface::set_multi_detector_size; - + flist[F_SET_RECEIVER_STREAMING_PORT] = &slsReceiverTCPIPInterface::set_streaming_port; #ifdef VERYVERBOSE for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) { FILE_LOG(logINFO) << "function fnum: " << i << " (" << getFunctionName((enum recFuncs)i) << ") located at " << (unsigned int)flist[i]; @@ -670,6 +671,12 @@ int slsReceiverTCPIPInterface::send_update() { #endif mySock->SendDataOnly(&ind,sizeof(ind)); + // streaming port +#ifdef SLS_RECEIVER_UDP_FUNCTIONS + ind=(int)receiverBase->getStreamingPort(); +#endif + mySock->SendDataOnly(&ind,sizeof(ind)); + if (!lockStatus) strcpy(mySock->lastClientIP,mySock->thisClientIP); @@ -2305,3 +2312,55 @@ int slsReceiverTCPIPInterface::set_multi_detector_size() { } + + +int slsReceiverTCPIPInterface::set_streaming_port() { + ret = OK; + memset(mess, 0, sizeof(mess)); + int port = -1; + int retval = -1; + + // receive arguments + if (mySock->ReceiveDataOnly(&port,sizeof(port)) < 0 ) + return printSocketReadError(); + + // execute action +#ifdef SLS_RECEIVER_UDP_FUNCTIONS + if (receiverBase == NULL) + invalidReceiverObject(); + else { + // set + if(port >= 0) { + if (mySock->differentClients && lockStatus) + receiverlocked(); + else if (receiverBase->getStatus() != IDLE) + receiverNotIdle(); + else { + receiverBase->setStreamingPort(port); + } + } + //get + retval=receiverBase->getStreamingPort(); + if(port > 0 && retval != port) { //if port = 0, its actual value calculated + ret = FAIL; + strcpy(mess, "Could not set streaming port\n"); + FILE_LOG(logERROR) << "Warning: " << mess; + } + } +#endif +#ifdef VERYVERBOSE + FILE_LOG(logDEBUG1) << "streaming port:" << retval; +#endif + + if (ret == OK && mySock->differentClients) + ret = FORCE_UPDATE; + + // send answer + mySock->SendDataOnly(&ret,sizeof(ret)); + if (ret == FAIL) + mySock->SendDataOnly(mess,sizeof(mess)); + mySock->SendDataOnly(&retval,sizeof(retval)); + + // return ok/fail + return ret; +} diff --git a/slsReceiverSoftware/src/slsReceiverUsers.cpp b/slsReceiverSoftware/src/slsReceiverUsers.cpp index 5c83caa99..bab16867f 100644 --- a/slsReceiverSoftware/src/slsReceiverUsers.cpp +++ b/slsReceiverSoftware/src/slsReceiverUsers.cpp @@ -17,10 +17,6 @@ void slsReceiverUsers::stop() { receiver->stop(); } -void slsReceiverUsers::closeFile(int p) { - receiver->closeFile(p); -} - int64_t slsReceiverUsers::getReceiverVersion(){ return receiver->getReceiverVersion(); }