gui works

This commit is contained in:
Dhanya Maliakal 2017-02-28 13:14:18 +01:00
parent 19460a03f5
commit 2024eb5d6e
2 changed files with 49 additions and 132 deletions

View File

@ -17,14 +17,13 @@ ID: $Id$
#include "postProcessingFuncs.h"
#include "usersFunctions.h"
#include "ThreadPool.h"
#include "ZmqSocket.h"
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <iostream>
#include <string>
#include <zmq.h>
#include <rapidjson/document.h> //to scan json header in zmq stream
using namespace std;
@ -272,11 +271,8 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods();
getMaxMods();
dataSocketsStarted = false;
for(int i=0;i<MAXDET;++i){
context[i] = NULL;
zmqsocket[i] = NULL;
strcpy(dataSocketServerDetails[i],"");
}
for(int i=0;i<MAXDET;++i)
zmqSocket[i] = 0;
threadpool = 0;
if(createThreadPool() == FAIL)
exit(-1);
@ -284,7 +280,12 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
}
multiSlsDetector::~multiSlsDetector() {
//removeSlsDetector();
//removeSlsDetector();
for(int i=0;i<MAXDET;++i) {
if (zmqSocket[i]) {
delete zmqSocket[i];
}
}
destroyThreadPool();
}
@ -5134,17 +5135,12 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
if(destroy){
cprintf(MAGENTA,"Going to destroy data sockets\n");
//close socket
for(int i=0;i<numSockets; ++i){
if(strlen(dataSocketServerDetails[i])){
zmq_disconnect(zmqsocket[i], dataSocketServerDetails[i]);
zmq_close(zmqsocket[i]);
zmq_ctx_destroy(context[i]);
context[i] = NULL;
zmqsocket[i] = NULL;
strcpy(dataSocketServerDetails[i],"");
}
}
for(int i=0;i<MAXDET;++i) {
if (zmqSocket[i]) {
delete zmqSocket[i];
zmqSocket[i] = 0;
}
}
dataSocketsStarted = false;
cout << "Destroyed Receiving Data Socket(s)" << endl;
return OK;
@ -5153,36 +5149,15 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
cprintf(MAGENTA,"Going to create data sockets\n");
for(int i=0;i<numSockets; ++i){
//get name of rx_hostname
char rx_hostname[100];
strcpy(dataSocketServerDetails[i],"tcp://");
strcpy(rx_hostname, detectors[i/numSocketsPerDetector]->getReceiver());
cout<<"rx_hostname:"<<rx_hostname<<endl;
//append it (first into ip) to tcp://
if(strchr(rx_hostname,'.')!=NULL)
strcat(dataSocketServerDetails[i],rx_hostname);
else{
//convert hostname to ip
struct hostent *he = gethostbyname(rx_hostname);
if (he == NULL){
cprintf(RED,"ERROR: could not convert receiver hostname to ip\n");
exit(-1);
}else
strcat(dataSocketServerDetails[i],inet_ntoa(*(struct in_addr*)he->h_addr));
uint32_t portnum = DEFAULT_ZMQ_PORTNO +
(i/numSocketsPerDetector)*numSocketsPerDetector + (i%numSocketsPerDetector);
zmqSocket[i] = new ZmqSocket(detectors[i/numSocketsPerDetector]->getReceiver(), portnum);
if (zmqSocket[i]->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket on port %d\n", portnum);
createReceivingDataSockets(true);
return FAIL;
}
//add port
sprintf(dataSocketServerDetails[i],"%s:%d",dataSocketServerDetails[i],DEFAULT_ZMQ_PORTNO +
(i/numSocketsPerDetector)*numSocketsPerDetector + (i%numSocketsPerDetector));//using this instead of i in the offchance, detid doesnt start at 0 (shmget error)
//create context
context[i] = zmq_ctx_new();
//create socket
zmqsocket[i] = zmq_socket(context[i], ZMQ_PULL);
//connect socket
zmq_connect(zmqsocket[i], dataSocketServerDetails[i]);
//int hwmval = 10;
//zmq_setsockopt(zmqsocket[i],ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
cout << "ZMQ Client[" << i << "] from " << dataSocketServerDetails[i] << endl;
printf("Zmq Client[%d] at %s\n",i, zmqSocket[i]->GetZmqServerAddress());
}
dataSocketsStarted = true;
@ -5198,91 +5173,23 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
int multiSlsDetector::getData(const int isocket, const bool masking, int* image, const int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename){
int multiSlsDetector::getData(const int isocket, const bool masking, int* image, const int size,
uint64_t &acqIndex, uint64_t &frameIndex, uint64_t &subframeIndex, string &filename){
zmq_msg_t message;
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
int len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
if (len == -1) {
cprintf(BG_RED,"Could not read header for socket %d\n",isocket);
zmq_msg_close(&message);
cprintf(RED, "%d message null\n",isocket);
if (!zmqSocket[isocket]->ReceiveHeader(isocket, acqIndex, frameIndex, subframeIndex, filename))
return FAIL;
}
// error if you print it
// cout << isocket << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",isocket,len);
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << isocket << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << isocket << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << isocket << shape[i].GetInt() << " ";
cout << endl;
cout << isocket << "type: " << d["type"].GetString() << endl;
#endif
if(d["acqIndex"].GetInt()!=-9){ //!isocket &&
acqIndex = d["acqIndex"].GetInt();
frameIndex = d["fIndex"].GetInt();
subframeIndex = d["subfnum"].GetInt();
filename = d["fname"].GetString();
#ifdef VERYVERBOSE
cout << "Acquisition index: " << acqIndex << endl;
cout << "Frame index: " << frameIndex << endl;
cout << "Subframe index: " << subframeIndex << endl;
cout << "File name: " << filename << endl;
#endif
if(frameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
// close the message
zmq_msg_close(&message);
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
//cprintf(BLUE,"%d data %d\n",isocket,len);
//end of socket ("end")
if(len == 3){
//cprintf(RED,"%d Received end of acquisition\n", isocket);
if (!zmqSocket[isocket]->ReceiveData(isocket, image, size))
return FAIL;
}
//crappy image
if (len < size ) {
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, isocket);
memset((char*)image,0xFF,size);
}
//actual image
else{
//actual data
//cprintf(BLUE,"%d actual dataaa\n",isocket);
memcpy((char*)image,(char*)zmq_msg_data(&message),size);
//jungfrau masking adcval
if(masking){
int snel = size/sizeof(int);
for(unsigned int i=0;i<snel;++i){
image[i] = (image[i] & 0x3FFF3FFF);
}
//jungfrau masking adcval
if(masking){
int snel = size/sizeof(int);
for(unsigned int i=0;i<snel;++i){
image[i] = (image[i] & 0x3FFF3FFF);
}
}
zmq_msg_close(&message); // close the message
return OK;
}
@ -5311,9 +5218,9 @@ void multiSlsDetector::readFrameFromReceiver(){
}
//gui variables
int currentAcquisitionIndex = -1;
int currentFrameIndex = -1;
int currentSubFrameIndex = -1;
uint64_t currentAcquisitionIndex = -1;
uint64_t currentFrameIndex = -1;
uint64_t currentSubFrameIndex = -1;
string currentFileName = "";
//getting sls values

View File

@ -17,6 +17,7 @@ ID: $Id$
class slsDetector;
class ThreadPool;
class ZmqSocket;
//#include "sls_detector_defs.h"
@ -1429,14 +1430,23 @@ private:
/**
* Gets data from socket
* @param isocket socket index
* @param masking if masking required (jungfrau)
* @param image image buffer
* @param size size of image
* @param acqIndex address of acquisition index
* @param frameIndex address of frame index
* @param subframeIndex address of subframe index
* @param filename address of file name
*/
int getData(const int isocket, const bool masking, int* image, const int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename);
int getData(const int isocket, const bool masking, int* image, const int size,
uint64_t &acqIndex, uint64_t &frameIndex, uint64_t &subframeIndex, string &filename);
/** Ensures if sockets created successfully */
bool dataSocketsStarted;
void *context[MAXDET];
void *zmqsocket[MAXDET];
char dataSocketServerDetails[MAXDET][100];
/** ZMQ Socket - Receiver to Client */
ZmqSocket* zmqSocket[MAXDET];
protected: