diff --git a/slsReceiverSoftware/CMakeLists.txt b/slsReceiverSoftware/CMakeLists.txt index 98f3574b2..79c9b6c73 100644 --- a/slsReceiverSoftware/CMakeLists.txt +++ b/slsReceiverSoftware/CMakeLists.txt @@ -18,6 +18,12 @@ include_directories( ../slsDetectorCalibration ) +add_library(zmq STATIC IMPORTED ) + +set_target_properties(zmq PROPERTIES + IMPORTED_LOCATION ${CMAKE_CURRENT_SOURCE_DIR}/include/libzmq.a +) + add_library(slsReceiverStatic STATIC ${SOURCES} ${HEADERS} @@ -45,4 +51,6 @@ set_target_properties(slsReceiver PROPERTIES target_link_libraries(slsReceiver slsReceiverShared pthread + zmq + rt ) diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 22e9ef00c..b3b81318d 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -16,9 +16,10 @@ #include #include #include -#include +#include #include //zmq +#include using namespace std; @@ -1669,7 +1670,7 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data //set current thread value index int ithread = currentThreadIndex; - + struct timespec begin,end; // server address to bind char hostName[100] = "tcp://127.0.0.1:"; @@ -1686,22 +1687,24 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data memset(buffer,0xFF,oneframesize); int bufferoffset = 0; int size = 0; - int offset=0; - int currentfnum = 0; + int offset = 0; + int currentfnum = -1; uint64_t fnum = 0; uint32_t pnum = 0; - void *context = zmq_ctx_new(); - // create a publisher - void *zmqsocket = zmq_socket(context, ZMQ_PUSH); - // bind - zmq_bind(zmqsocket,hostName); + bool randomSendNow = true; + + void *context = zmq_ctx_new(); + void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher + zmq_bind(zmqsocket,hostName); // bind //let calling function know thread started and obtained current (after sockets created) if(!zmqThreadStarted) zmqThreadStarted = true; - currentfnum = -1; + const char *type = "float64"; + const char *shape= "[1024, 512]"; + /* inner loop - loop for each buffer */ //until mask reset (dummy pcaket got by writer) while((1 << ithread) & dataCallbackThreadsMask){ @@ -1720,8 +1723,16 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data /*if (checkJoinThread()){for different scans break; }*/ + ostringstream header; + header << "{\"htype\":[\"chunk-1.0\"], " + << "\"type\":" << "\"" << type << "\", " + << "\"shape\":" << shape + << "}"; + //send header + zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE); + sleep(1); + //send data zmq_send (zmqsocket, "end", 3, 0); - //cprintf(BLUE,"sent done\n"); pthread_mutex_lock(&statusMutex); dataCallbackThreadsMask^=(1< currentfnum){ + ostringstream header; + header << "{\"htype\":[\"chunk-1.0\"], " + << "\"type\":" << "\"" << type << "\", " + << "\"shape\":" << shape + << "}"; + //send header + zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE); + sleep(1); + //send data zmq_send(zmqsocket, buffer, oneframesize, 0); +#ifdef DEBUG + cprintf(BLUE,"%d sent (last packet)\n",ithread); +#endif + //start clock after sending + if(!frameToGuiFrequency){ + randomSendNow = false; + clock_gettime(CLOCK_REALTIME, &begin); + } memset(buffer,0xFF,oneframesize); currentfnum = fnum; }