included json example with dummy values

This commit is contained in:
Dhanya Maliakal 2016-09-16 17:19:55 +02:00
parent e9a3310042
commit 57e741c36c
2 changed files with 79 additions and 11 deletions

View File

@ -18,6 +18,12 @@ include_directories(
../slsDetectorCalibration ../slsDetectorCalibration
) )
add_library(zmq STATIC IMPORTED )
set_target_properties(zmq PROPERTIES
IMPORTED_LOCATION ${CMAKE_CURRENT_SOURCE_DIR}/include/libzmq.a
)
add_library(slsReceiverStatic STATIC add_library(slsReceiverStatic STATIC
${SOURCES} ${SOURCES}
${HEADERS} ${HEADERS}
@ -45,4 +51,6 @@ set_target_properties(slsReceiver PROPERTIES
target_link_libraries(slsReceiver target_link_libraries(slsReceiver
slsReceiverShared slsReceiverShared
pthread pthread
zmq
rt
) )

View File

@ -16,9 +16,10 @@
#include <iostream> #include <iostream>
#include <string.h> #include <string.h>
#include <stdint.h> #include <stdint.h>
#include <ctime> #include <time.h>
#include <zmq.h> //zmq #include <zmq.h> //zmq
#include <sstream>
using namespace std; using namespace std;
@ -1669,7 +1670,7 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
//set current thread value index //set current thread value index
int ithread = currentThreadIndex; int ithread = currentThreadIndex;
struct timespec begin,end;
// server address to bind // server address to bind
char hostName[100] = "tcp://127.0.0.1:"; char hostName[100] = "tcp://127.0.0.1:";
@ -1686,22 +1687,24 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
memset(buffer,0xFF,oneframesize); memset(buffer,0xFF,oneframesize);
int bufferoffset = 0; int bufferoffset = 0;
int size = 0; int size = 0;
int offset=0; int offset = 0;
int currentfnum = 0; int currentfnum = -1;
uint64_t fnum = 0; uint64_t fnum = 0;
uint32_t pnum = 0; uint32_t pnum = 0;
void *context = zmq_ctx_new(); bool randomSendNow = true;
// create a publisher
void *zmqsocket = zmq_socket(context, ZMQ_PUSH);
// bind
zmq_bind(zmqsocket,hostName);
void *context = zmq_ctx_new();
void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher
zmq_bind(zmqsocket,hostName); // bind
//let calling function know thread started and obtained current (after sockets created) //let calling function know thread started and obtained current (after sockets created)
if(!zmqThreadStarted) if(!zmqThreadStarted)
zmqThreadStarted = true; zmqThreadStarted = true;
currentfnum = -1; const char *type = "float64";
const char *shape= "[1024, 512]";
/* inner loop - loop for each buffer */ /* inner loop - loop for each buffer */
//until mask reset (dummy pcaket got by writer) //until mask reset (dummy pcaket got by writer)
while((1 << ithread) & dataCallbackThreadsMask){ while((1 << ithread) & dataCallbackThreadsMask){
@ -1720,8 +1723,16 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
/*if (checkJoinThread()){for different scans /*if (checkJoinThread()){for different scans
break; break;
}*/ }*/
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
sleep(1);
//send data
zmq_send (zmqsocket, "end", 3, 0); zmq_send (zmqsocket, "end", 3, 0);
//cprintf(BLUE,"sent done\n");
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask^=(1<<ithread); dataCallbackThreadsMask^=(1<<ithread);
@ -1729,10 +1740,25 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
continue; continue;
} }
//random and the timer is on (randomSendNow is false)
if(!frameToGuiFrequency){
if(!randomSendNow){
clock_gettime(CLOCK_REALTIME, &end);
#ifdef DEBUG
cprintf(BLUE,"%d Elapsed time:%f seconds\n",ithread,( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0);
#endif
//still less than 250 ms, keep waiting
if((( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0) < 0.250)/**fixed 250 ms*/
continue;
//done with timer, look into data
randomSendNow = true;
}
}
size = guiNumPackets[ithread]*onePacketSize; size = guiNumPackets[ithread]*onePacketSize;
offset=0; offset=0;
//copy packet by packet -getting rid of headers, -in the right order(padding missing packets)
while(offset < size){ while(offset < size){
//until getting frame number is not error //until getting frame number is not error
@ -1753,7 +1779,24 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
if(pnum == packetsPerFrame){ if(pnum == packetsPerFrame){
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize); memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
offset+= onePacketSize; offset+= onePacketSize;
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
sleep(1);
//send data
zmq_send(zmqsocket, buffer, oneframesize, 0); zmq_send(zmqsocket, buffer, oneframesize, 0);
#ifdef DEBUG
cprintf(BLUE,"%d sent (last packet)\n",ithread);
#endif
//start clock after sending
if(!frameToGuiFrequency){
randomSendNow = false;
clock_gettime(CLOCK_REALTIME, &begin);
}
memset(buffer,0xFF,oneframesize); memset(buffer,0xFF,oneframesize);
currentfnum = -1; currentfnum = -1;
} }
@ -1761,7 +1804,24 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
else { else {
//next frame //next frame
if(fnum > currentfnum){ if(fnum > currentfnum){
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
sleep(1);
//send data
zmq_send(zmqsocket, buffer, oneframesize, 0); zmq_send(zmqsocket, buffer, oneframesize, 0);
#ifdef DEBUG
cprintf(BLUE,"%d sent (last packet)\n",ithread);
#endif
//start clock after sending
if(!frameToGuiFrequency){
randomSendNow = false;
clock_gettime(CLOCK_REALTIME, &begin);
}
memset(buffer,0xFF,oneframesize); memset(buffer,0xFF,oneframesize);
currentfnum = fnum; currentfnum = fnum;
} }