can set zmqport from receiver, ensured proper destructors, and ctrl c should kill it

This commit is contained in:
Dhanya Maliakal
2017-07-13 12:17:49 +02:00
parent 672c42a20e
commit 39560969f4
22 changed files with 217 additions and 119 deletions

View File

@ -31,12 +31,12 @@ uint64_t DataProcessor::RunningMask(0x0);
pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER;
DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable,
DataProcessor::DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable,
void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
char*, uint32_t, void*),
void *pDataReadycb) :
ThreadObject(NumberofDataProcessors),
ThreadObject(),
generalData(0),
fifo(f),
file(0),
@ -53,6 +53,8 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool*
rawDataReadyCallBack(dataReadycb),
pRawDataReady(pDataReadycb)
{
index = ind;
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);

View File

@ -24,8 +24,8 @@ uint64_t DataStreamer::RunningMask(0x0);
pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER;
DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) :
ThreadObject(NumberofDataStreamers),
DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) :
ThreadObject(),
generalData(0),
fifo(f),
zmqSocket(0),
@ -40,6 +40,10 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* tim
firstMeasurementIndex(0),
completeBuffer(0)
{
index = NumberofDataStreamers;
cprintf(RED, "%d: Number of DataStreamers: %d\n", index, NumberofDataStreamers);
//index = ind;
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
@ -158,11 +162,8 @@ int DataStreamer::SetThreadPriority(int priority) {
}
int DataStreamer::CreateZmqSockets(int* dindex, int* nunits) {
uint32_t portnum = DEFAULT_ZMQ_PORTNO + ((*dindex) * (*nunits) + index);
//using userReceiver where all receivers in one program (numberofstreamers>*nunits)
if(index >= *nunits)
portnum = DEFAULT_ZMQ_PORTNO + index;
int DataStreamer::CreateZmqSockets(int* nunits, uint32_t port) {
uint32_t portnum = port + index;
zmqSocket = new ZmqSocket(portnum);
if (zmqSocket->IsError()) {

View File

@ -14,14 +14,15 @@ using namespace std;
int Fifo::NumberofFifoClassObjects(0);
Fifo::Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success):
Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t fifoDepth, bool &success):
index(ind),
memory(0),
fifoBound(0),
fifoFree(0),
fifoStream(0),
status_fifoBound(0){
FILE_LOG (logDEBUG) << __AT__ << " called";
index = NumberofFifoClassObjects++;
NumberofFifoClassObjects++;
if(CreateFifos(fifoItemSize, fifoDepth) == FAIL)
success = false;
}

View File

@ -27,8 +27,8 @@ uint64_t Listener::RunningMask(0x0);
pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER;
Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
ThreadObject(NumberofListeners),
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
ThreadObject(),
generalData(0),
fifo(f),
myDetectorType(dtype),
@ -50,6 +50,7 @@ Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno,
carryOverPacket(0),
listeningPacket(0)
{
index = ind;
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);

View File

@ -12,8 +12,8 @@ using namespace std;
ThreadObject::ThreadObject(int ind):
index(ind),
ThreadObject::ThreadObject():
index(0),
alive(false),
killThread(false),
thread(0)

View File

@ -6,6 +6,7 @@
#include "UDPBaseImplementation.h"
#include "genericSocket.h"
#include "ZmqSocket.h"
#include <sys/stat.h> // stat
#include <iostream>
@ -77,6 +78,7 @@ void UDPBaseImplementation::initializeMembers(){
frameToGuiFrequency = 0;
frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER;
dataStreamEnable = false;
streamingPort = 0;
}
UDPBaseImplementation::~UDPBaseImplementation(){}
@ -203,6 +205,7 @@ slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ FILE_LOG(lo
int UDPBaseImplementation::getActivate() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return activated;}
uint32_t UDPBaseImplementation::getStreamingPort() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return streamingPort;}
/*************************************************************************
* Setters ***************************************************************
@ -523,11 +526,6 @@ void UDPBaseImplementation::abort(){
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
void UDPBaseImplementation::closeFiles(){
FILE_LOG(logWARNING) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
int UDPBaseImplementation::setActivate(int enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -540,6 +538,17 @@ int UDPBaseImplementation::setActivate(int enable){
return activated;
}
void UDPBaseImplementation::setStreamingPort(const uint32_t i) {
if (streamingPort == 0)
streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports
else
streamingPort = i;
FILE_LOG(logINFO) << "Streaming Port: " << streamingPort;
}
/***callback functions***/
void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){
startAcquisitionCallBack=func;

View File

@ -10,6 +10,7 @@
#include "DataProcessor.h"
#include "DataStreamer.h"
#include "Fifo.h"
#include "ZmqSocket.h" //just for the zmq port define
#include <cstdlib> //system
#include <cstring> //strcpy
@ -182,7 +183,8 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
}
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\
if (dataStreamEnable != enable) {
dataStreamEnable = enable;
@ -194,9 +196,12 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
if (enable) {
bool error = false;
for ( int i = 0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, &shortFrameEnable));
dataStreamer.push_back(new DataStreamer(i, fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, &shortFrameEnable));
dataStreamer[i]->SetGeneralData(generalData);
if (dataStreamer[i]->CreateZmqSockets(&detID, &numThreads) == FAIL) {
// check again
if (streamingPort == 0)
streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort) == FAIL) {
error = true;
break;
}
@ -370,8 +375,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
//create threads
for ( int i=0; i < numThreads; ++i ) {
listener.push_back(new Listener(myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange));
dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable,
listener.push_back(new Listener(i, myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange));
dataProcessor.push_back(new DataProcessor(i, fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable,
rawDataReadyCallBack,pRawDataReady));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";
@ -727,7 +732,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
for ( int i = 0; i < numThreads; i++ ) {
//create fifo structure
bool success = true;
fifo.push_back( new Fifo (
fifo.push_back( new Fifo (i,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth, success));
if (!success) {

View File

@ -6,62 +6,85 @@
#include <iostream>
#include <string.h>
#include <signal.h> //SIGINT
#include <signal.h> //SIGINT
#include <cstdlib> //system
#include "utilities.h"
#include "logger.h"
#include <sys/types.h> //wait
#include <sys/wait.h> //wait
using namespace std;
slsReceiverUsers *receiver;
void deleteReceiver(slsReceiverUsers* r){
if(r){delete r;r=0;}
}
bool keeprunning;
void closeFile(int p){
deleteReceiver(receiver);
void sigInterruptHandler(int p){
keeprunning = false;
}
/*
int startAcquisitionCallBack(char* filePath, char* fileName, int fileIndex, int bufferSize, void* context) {
FILE_LOG(logINFO) << "#### startAcquisitionCallBack ####";
FILE_LOG(logINFO) << "* filePath: " << filePath;
FILE_LOG(logINFO) << "* fileName: " << fileName;
FILE_LOG(logINFO) << "* fileIndex: " << fileIndex;
FILE_LOG(logINFO) << "* bufferSize: " << bufferSize;
return 1;
int StartAcq(char* filepath, char* filename, uint64_t fileindex, uint32_t datasize, void*p){
printf("#### StartAcq: filepath:%s filename:%s fileindex:%llu datasize:%u ####\n",
filepath, filename, fileindex, datasize);
cprintf(BLUE, "--StartAcq: returning 0\n");
return 0;
}
void acquisitionFinishedCallBack(int totalFramesCaught, void* context) {
FILE_LOG(logINFO) << "#### acquisitionFinishedCallBack ####";
FILE_LOG(logINFO) << "* totalFramesCaught: " << totalFramesCaught;
void AcquisitionFinished(uint64_t frames, void*p){
cprintf(BLUE, "#### AcquisitionFinished: frames:%llu ####\n",frames);
}
void rawDataReadyCallBack(int currFrameNum, char* dataPointer, int dataSize, FILE* file, char* guiDataPointer, void* context) {
FILE_LOG(logINFO) << "#### rawDataReadyCallBack ####";
FILE_LOG(logINFO) << "* currFrameNum: " << currFrameNum;
FILE_LOG(logINFO) << "* dataSize: " << dataSize;
void GetData(uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp,
uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber, uint8_t detType, uint8_t version,
char* datapointer, uint32_t datasize, void* p){
PRINT_IN_COLOR (xCoord,
"#### %d GetData: ####\n"
"frameNumber: %llu\t\texpLength: %u\t\tpacketNumber: %u\t\tbunchId: %llu\t\ttimestamp: %llu\t\tmodId: %u\t\t"
"xCoord: %u\t\tyCoord: %u\t\tzCoord: %u\t\tdebug: %u\t\troundRNumber: %u\t\tdetType: %u\t\t"
"version: %u\t\tfirstbytedata: 0x%x\t\tdatsize: %u\n\n",
xCoord, frameNumber, expLength, packetNumber, bunchId, timestamp, modId,
xCoord, yCoord, zCoord, debug, roundRNumber, detType, version,
((uint8_t)(*((uint8_t*)(datapointer)))), datasize);
}
*/
int main(int argc, char *argv[]) {
//Catch signal SIGINT to close files properly
signal(SIGINT,closeFile);
keeprunning = true;
// Catch signal SIGINT to close files and call destructors properly
struct sigaction sa;
sa.sa_flags=0; // no flags
sa.sa_handler=sigInterruptHandler; // handler function
sigemptyset(&sa.sa_mask); // dont block additional signals during invocation of handler
if (sigaction(SIGINT, &sa, NULL) == -1) {
bprintf(RED, "Could not set handler function for SIGINT\n");
}
// if socket crash, ignores SISPIPE, prevents global signal handler
// subsequent read/write to socket gives error - must handle locally
signal(SIGPIPE, SIG_IGN);
struct sigaction asa;
asa.sa_flags=0; // no flags
asa.sa_handler=SIG_IGN; // handler function
sigemptyset(&asa.sa_mask); // dont block additional signals during invocation of handler
if (sigaction(SIGPIPE, &asa, NULL) == -1) {
bprintf(RED, "Could not set handler function for SIGCHILD\n");
}
//system("setterm -linux term -background white -clear");
int ret = slsReceiverDefs::OK;
receiver = new slsReceiverUsers(argc, argv, ret);
slsReceiverUsers *receiver = new slsReceiverUsers(argc, argv, ret);
if(ret==slsReceiverDefs::FAIL){
deleteReceiver(receiver);
return -1;
delete receiver;
exit(EXIT_FAILURE);
}
@ -107,18 +130,16 @@ int main(int argc, char *argv[]) {
//start tcp server thread
if(receiver->start() == slsReceiverDefs::OK){
FILE_LOG(logDEBUG1) << "DONE!";
string str;
cin>>str;
//wait and look for an exit keyword
while(str.find("exit") == string::npos)
cin>>str;
//stop tcp server thread, stop udp socket
receiver->stop();
if (receiver->start() == slsReceiverDefs::FAIL){
delete receiver;
exit(EXIT_FAILURE);
}
deleteReceiver(receiver);
FILE_LOG(logINFO) << "Ready ... ";
bprintf(GRAY, "\n[ Press \'Ctrl+c\' to exit ]\n");
while(keeprunning);
delete receiver;
FILE_LOG(logINFO) << "Goodbye!";
return 0;
}

View File

@ -135,9 +135,7 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success) {
slsReceiver::~slsReceiver() {
if(udp_interface)
delete udp_interface;
if(tcpipInterface)
if(tcpipInterface)
delete tcpipInterface;
}
@ -152,11 +150,6 @@ void slsReceiver::stop() {
}
void slsReceiver::closeFile(int p) {
tcpipInterface->closeFile(p);
}
int64_t slsReceiver::getReceiverVersion(){
return tcpipInterface->getReceiverVersion();
}

View File

@ -25,6 +25,8 @@ slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() {
delete mySock;
mySock=NULL;
}
if(receiverBase)
delete receiverBase;
}
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn):
@ -137,7 +139,7 @@ int slsReceiverTCPIPInterface::start(){
void slsReceiverTCPIPInterface::stop(){
FILE_LOG(logINFO) << "Shutting down UDP Socket";
FILE_LOG(logINFO) << "Shutting down TCP Socket";
killTCPServerThread = 1;
if(mySock) mySock->ShutDownSocket();
FILE_LOG(logDEBUG) << "Socket closed";
@ -147,11 +149,6 @@ void slsReceiverTCPIPInterface::stop(){
void slsReceiverTCPIPInterface::closeFile(int p){
receiverBase->closeFiles();
}
int64_t slsReceiverTCPIPInterface::getReceiverVersion(){
int64_t retval = SVNREV;
retval= (retval <<32) | SVNDATE;
@ -220,9 +217,6 @@ void slsReceiverTCPIPInterface::startTCPServer(){
FILE_LOG(logINFO) << "Shutting down UDP Socket";
if(receiverBase){
receiverBase->shutDownUDPSockets();
FILE_LOG(logINFO) << "Closing Files... ";
receiverBase->closeFiles();
}
mySock->exitServer();
@ -230,8 +224,14 @@ void slsReceiverTCPIPInterface::startTCPServer(){
}
//if user entered exit
if(killTCPServerThread)
if(killTCPServerThread) {
if (v != GOODBYE) {
if(receiverBase){
receiverBase->shutDownUDPSockets();
}
}
pthread_exit(NULL);
}
}
}
@ -277,6 +277,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT";
case F_SEND_RECEIVER_DETPOSID: return "F_SEND_RECEIVER_DETPOSID";
case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE";
case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT";
default: return "Unknown Function";
}
}
@ -322,7 +323,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_SET_RECEIVER_FILE_FORMAT] = &slsReceiverTCPIPInterface::set_file_format;
flist[F_SEND_RECEIVER_DETPOSID] = &slsReceiverTCPIPInterface::set_detector_posid;
flist[F_SEND_RECEIVER_MULTIDETSIZE] = &slsReceiverTCPIPInterface::set_multi_detector_size;
flist[F_SET_RECEIVER_STREAMING_PORT] = &slsReceiverTCPIPInterface::set_streaming_port;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
FILE_LOG(logINFO) << "function fnum: " << i << " (" << getFunctionName((enum recFuncs)i) << ") located at " << (unsigned int)flist[i];
@ -670,6 +671,12 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
mySock->SendDataOnly(&ind,sizeof(ind));
// streaming port
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getStreamingPort();
#endif
mySock->SendDataOnly(&ind,sizeof(ind));
if (!lockStatus)
strcpy(mySock->lastClientIP,mySock->thisClientIP);
@ -2305,3 +2312,55 @@ int slsReceiverTCPIPInterface::set_multi_detector_size() {
}
int slsReceiverTCPIPInterface::set_streaming_port() {
ret = OK;
memset(mess, 0, sizeof(mess));
int port = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&port,sizeof(port)) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(port >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setStreamingPort(port);
}
}
//get
retval=receiverBase->getStreamingPort();
if(port > 0 && retval != port) { //if port = 0, its actual value calculated
ret = FAIL;
strcpy(mess, "Could not set streaming port\n");
FILE_LOG(logERROR) << "Warning: " << mess;
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "streaming port:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}

View File

@ -17,10 +17,6 @@ void slsReceiverUsers::stop() {
receiver->stop();
}
void slsReceiverUsers::closeFile(int p) {
receiver->closeFile(p);
}
int64_t slsReceiverUsers::getReceiverVersion(){
return receiver->getReceiverVersion();
}