Files
cpp/include/zbsread.h
2021-03-16 09:30:56 +01:00

1767 lines
75 KiB
C++

#ifndef ZBSREAD_H
#define ZBSREAD_H
#include <inttypes.h>
#include <zmq.h>
#include <assert.h>
#include <signal.h>
#if HAVE_JSON
#include <json/json.h>
#endif
#include <zbsDataHolders.h>
#include <zbsdtHelper.h>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace CAFE_BSHELPER;
//Preferred to BitsetV because of speed
IntegerBitset<unsigned char> bsetUChar;
IntegerBitset<char> bsetChar;
IntegerBitset<unsigned short> bsetUShort;
IntegerBitset<short> bsetShort;
IntegerBitset<unsigned int> bsetUInt;
IntegerBitset<int> bsetInt;
IntegerBitset<unsigned long long> bsetULongLong;
IntegerBitset<long long> bsetLongLong;
//IntegerBitsetV<unsigned char> bsetUCharV;
//IntegerBitsetV<char> bsetCharV;
//IntegerBitsetV<unsigned short> bsetUShortV;
//IntegerBitsetV<short> bsetShortV;
//IntegerBitsetV<unsigned int> bsetUIntV;
//IntegerBitsetV<int> bsetIntV;
//IntegerBitsetV<unsigned long long> bsetULongLongV;
//IntegerBitsetV<long long> bsetLongLongV;
FloatBitset<float> bsetFloat;
FloatBitset<double> bsetDouble;
//FloatBitsetV<float> bsetFloatV;
//FloatBitsetV<double> bsetDoubleV;
bool bsdtInsertFlag=false;
bool dataHeaderReceived=false;
unsigned short inDumpFlag=2;
std::string hashIs="";
std::string hashOriginal="";
unsigned short hashOriginalFlag=0;
bool fillBSPV=false;
std::vector<std::string> bsPV;
bool fill_bs_read_PV=false;
std::vector<std::string> bs_read_PV;
/**
* \brief Receives all message parts from zeromq socket
* \param dbpm in/output: DBPMKeeper object
* \return long ICAFE_NORMAL else ECAFE_error
*/
static void
z_bsread_dbpm (DBPMKeeper & dbpm)
{
if (inDumpFlag==1)
{
std::cout << "zeroMQ socket is busy " << std::endl;
std::cout << "waiting for zmq timeout " << std::endl;
}
//puts ("//START----------------------------------------//");
void * socket = dbpm.subscriber;
#if HAVE_JSON
Json::Value parsedFromString;
Json::Reader reader;
bool parsingSuccessful;
Json::FastWriter fastWriter;
#endif
int64_t more; // Multipart detection
more = 0;
size_t more_size = sizeof (more);
int bsPVIdx=-1;
dbpm.status=ICAFE_NORMAL;
int nZeroSize=0;
//std::cout << "df = " << inDumpFlag << std::endl;
while (inDumpFlag==1)
{
std::cout << "df/ = " << inDumpFlag << std::endl;
std::cout << " sleeping " << std::endl;
}
int subMessage=0;
int nSequentialHeader=0;
while (1)
{
inDumpFlag=1;
//puts ("//WHILE LOOP ----------------------------------------//");
//std::cout << "subMessage " << subMessage << std::endl;
// Process all parts of the message
zmq_msg_t message;
zmq_msg_init (&message);
size_t size = zmq_msg_recv (&message, socket, 0);
//puts ("//MESSAGE RECEIVED ----------------------------------------//");
if (size == -1)
{
std::cout << " Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
//Resource unavailable means that there is nothing to read now
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
//std::cout << "message mulipart --> more == " << more << std::endl;
zmq_msg_close (&message);
//if (!more) {
// dbpm.status=ERRNO_EAGAIN; //ICAFE_ERRNO_BASE +zmq_errno() ;
// std::cout << "EARLY BREAK subMessage total: " << subMessage << std::endl;
// puts ("//------------------------------------END-------------------------------------//");
// break; // Last message
// }
dbpm.status=ICAFE_ERRNO_BASE+zmq_errno() ;
break;
}
else if (size == 0)
{
////std::cout << " Data of Zero SIZE for submessage " << subMessage << std::endl;
++nZeroSize;
//zmq_msg_close (&message);
//dbpm.status=ECAFE_NODATA;
//break;
//Comes in pairs; one for val one for timestamp
if (nZeroSize%2==1)
{
++bsPVIdx;
if (bsPVIdx >0)
{
std::cout << " Data of Zero SIZE for submessage " << subMessage << std::endl;
std::cout << bsPV[bsPVIdx] << std::endl;
}
}
++subMessage;
//continue;
}
else
{
// Dump the message as text or binary
char *data = (char*)zmq_msg_data (&message);
int is_text = 1;
int char_nbr;
//char cmd[5000]="";
for (char_nbr = 0; char_nbr < size; char_nbr++)
if ((unsigned char) data [char_nbr] < 32
|| (unsigned char) data [char_nbr] > 127)
is_text = 0;
//TExt is two header files
//non-text size=8 is X1, Y1, Q1
//non-text size=2 is -VALID
//non-text size=16 is timestamp
/*
printf (" SIZE [%03d] ", size);
if (is_text) {
std::cout << " TEXT +++++++++++ " << std::endl;
}
else {
std::cout << " NOT TEXT +++++++++++ " << std::endl;
}
for (char_nbr = 0; char_nbr < size; char_nbr++) {
if (is_text) {
printf ("%c", data [char_nbr]);
//snprintf(cmd + strlen(cmd), (sizeof cmd) - strlen(cmd), "%c", data [char_nbr]);
}
else {
printf ("%02X", (unsigned char) data [char_nbr]);
//printf ("%d", (unsigned char) data [char_nbr]);
//snprintf(cmd + strlen(cmd), (sizeof cmd) - strlen(cmd), "%d", (unsigned char)data [char_nbr]);
//if (data[0] == '\x7') {
// std::cout << " little endian " << std::endl;
//}
//else {
// std::cout << "big endian " << std::endl;
//}
}
}//for
printf ("\n");
*/
if (is_text)
{
parsingSuccessful=reader.parse(data, parsedFromString);
if (parsingSuccessful)
{
//Json::StyledWriter styledWriter;
//std::cout << "STYLED: --------------------------------" << std::endl;
//std::cout << styledWriter.write(parsedFromString) << std::endl;
//std::cout << "----------------------------------" << std::endl;
//std::cout << parsedFromString["htype"] << std::endl;
if (fastWriter.write(parsedFromString["htype"]).find("bsr_m-1.1") != std::string::npos)
{
++nSequentialHeader;
hashIs=parsedFromString["hash"].asString();
if (hashOriginalFlag==0)
{
std::cout << hashIs << " is different to original/// " << hashOriginal << std::endl;
hashOriginal=hashIs;
++hashOriginalFlag;
fillBSPV=true;
}
if (hashOriginal.compare(hashIs)!=0)
{
std::cout << hashIs << " is different to original " << hashOriginal << std::endl;
hashOriginal=hashIs;
++hashOriginalFlag;
fillBSPV=true;
}
//std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
dbpm.setPulse_id(parsedFromString["pulse_id"].asUInt64());
//Reset values as a change of hash signifies that data from two pulse ids
//is being sent in one zeromq messages
subMessage=0;
bsPVIdx=-1;
dbpm.status=ICAFE_NORMAL;
nZeroSize=0;
/*
std::cout << "(1)++++++++++++++++++++++++++++++++++++++++MAIN++++++++++++++++++++++++++++++++++++++++++" << std::endl;
std::cout << "hash " << parsedFromString["hash"] << std::endl;
std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
std::cout << "g ts " << parsedFromString["global_timestamp"] << std::endl;
std::cout << "comp " << parsedFromString["dh_compression"] << std::endl;
std::cout << "sec " << parsedFromString["global_timestamp"]["sec"].asUInt() << std::endl;
std::cout << "nsec " << parsedFromString["global_timestamp"]["ns"].asUInt() << std::endl;
*/
}
else if (fastWriter.write(parsedFromString["htype"]).find("bsr_d-1.1") != std::string::npos)
{
++nSequentialHeader;
if (fillBSPV)
{
bsPV.clear();
bsPV.reserve(dbpm.getNPV());
/*
if (dbpm.getNPV() != parsedFromString["channels"].size() ) {
std::cout << "No of CONFIGURED BPMS: " << dbpm.getNPV()
<< " is diffent to that being channeled " << parsedFromString["channels"].size() << std::endl;
bsPV.reserve( std::max( (size_t) parsedFromString["channels"].size(),dbpm.getNPV()) );
}
*/
//std::cout << "chan " << parsedFromString["channels"] << std::endl;
//std::cout << "No of channels " << parsedFromString["channels"].size() << std::endl;
//std::cout << "(2)++++++++++++++++++++++++++++++++++++++++HEADER++++++++++++++++++++++++++++++++++++++++++" << std::endl;
for (Json::Value::ArrayIndex i=0; i < parsedFromString["channels"].size(); ++ i)
{
/*
std::cout << "name " << parsedFromString["channels"][i]["name"].asString() << std::endl;
std::cout << "enco " << parsedFromString["channels"][i]["encoding"] << std::endl;
std::cout << "type " << parsedFromString["channels"][i]["type"] << std::endl;
*/
bsPV.push_back( (parsedFromString["channels"][i]["name"]).asString());
}
/*
std::cout << "LIST OF PVS " << std::endl;
for (size_t i=0; i< bsPV.size(); ++i ) {
std::cout << i << " // " << bsPV[i].c_str() << " " << std::endl;
}
std::cout << std::endl;
*/
//std::cout << "NEW FILL: size of bsPV " << bsPV.size() << std::endl;
fillBSPV=false;
} //if fill
}
else if (fastWriter.write(parsedFromString["htype"]).find("bsr_reconnect-1.0") != std::string::npos)
{
std::cout << "RECONNECT CONTROL MESSAGE " << std::endl;
std::cout << "HAS NEW SOURCE ADDRESS: " << fastWriter.write(parsedFromString["address"]) << std::endl;
}
else if (fastWriter.write(parsedFromString["htype"]).find("bsr_stop-1.0") != std::string::npos)
{
std::cout << "STOP CONTROL MESSAGE " << std::endl;
std::cout << "RECEIVED: " << fastWriter.write(parsedFromString["htype"]) << std::endl;
std::cout << "From BS Documentation: Message can be ignored as source will send from same address after startup" << std::endl;
}
else
{
std::cout << "HEADER IS SOMETHING ELSE: " << std::endl;
std::cout << parsedFromString["htype"] << std::endl;
exit(1);
}
//std::cout << "/----------------------------------/" << std::endl;
}
}
if (nSequentialHeader >3 && is_text==1)
{
std::cout << "WARNING: ZEROMQ SUB-MESSAGE DOES NOT CLOSE " << std::endl;
std::cout << "WARNING: FORCING ZMQ_MSG_CLOSE " << std::endl;
std::cout << "No of sequential headers " << nSequentialHeader << std::endl;
std::cout << "is_text " << is_text << std::endl;
//zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
//zmq_msg_close (&message);
//std::cout << "message mulipart --> more == " << more << std::endl;
//nSequentialHeader=0;
//cannot break; results in seqmentation fault
//if (!more) {
// dbpm.status=ERRNO_EAGAIN; //ICAFE_ERRNO_BASE +zmq_errno() ;
// std::cout << "EARLY BREAK subMessage total: " << subMessage << std::endl;
// puts ("//------------------------------------END-------------------------------------//");
// break; // Last message
//}
}
union foo
{
char c[sizeof(double)];
double d;
} bar;
//SIZE 16 is timestamp
//SIZE 8 is x,y,Q
//SIZE 2 is valid
if (subMessage > 1430)
{
std::cout << " SIZE " << size << " subMessage " << subMessage << std::endl;
std::cout << "size of bsPV " << bsPV.size() << std::endl;
}
if (subMessage > 0 && subMessage%2 ==0)
{
if (size==8)
{
// big endian
for (char_nbr = 0; char_nbr < size; char_nbr++)
{
bar.c[char_nbr]=data[size-1-char_nbr]; // THis works for big engian
}
// little endian
//for (char_nbr = 0; char_nbr < size; char_nbr++) {
// bar.c[char_nbr]=data[char_nbr];
//}
//std::cout << "UNION D " << bar.d << std::endl;
double v; // = (double*) data;
memcpy(&v, bar.c, sizeof(double));
//std::cout << " double val " << v << std::endl;
//This is BPM Data - value
++bsPVIdx;
if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)
{
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " bsPV index = " << bsPVIdx << std::endl;
std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
std::cout << " SKIPPING THIS BPM... " << std::endl;
continue;
}
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set(v);
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].setStatus(ICAFE_NORMAL);
//std::cout << " readback of value that was set = " << dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].val[0].d << std::endl;
}
else if (size==2)
{
unsigned short iv = 0;
//memcpy(&iv, cmd, size);
//big endian
for (size_t n=0; n < size; n++)
{
iv = (iv << 8) + data[n];
}
//little endian
//for (size_t n = size; n >= 0; n--) {
// iv = (iv << 8) + data[n];
//}
//This is BPM Data - VALID/INVALID
//std::cout << "uint val (1 means valid) " << iv << std::endl;
/*
std::copy(data, data + 32, reinterpret_cast<char *>(&i));
std::cout << "uint16 val " << i << std::endl;
std::copy(data, data + 8, reinterpret_cast<char *>(&i));
std::cout << "uint16 val " << i << std::endl;
std::copy(data, data + 4, reinterpret_cast<char *>(&i));
std::cout << "uint16 val " << i << std::endl;
std::copy(data, data + 2, reinterpret_cast<char *>(&i));
std::cout << "uint16 val " << i << std::endl;
std::copy(data, data + 1, reinterpret_cast<char *>(&i));
std::cout << "uint16 val " << i << std::endl;
*/
++bsPVIdx;
if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)
{
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " bsPV index = " << bsPVIdx << std::endl;
std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
std::cout << " SKIPPING THIS BPM ENUM TYPE " << std::endl;
continue;
}
if (iv==1)
{
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set((std::string) "VALID");
}
else
{
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].set((std::string) "INVALID");
}
//std::cout << "value DBPM = " << dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].val[0].str << std::endl;
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].setStatus(ICAFE_NORMAL);
}
}
else if (subMessage > 1 && subMessage%2 ==1)
{
//std::cout << "timestamp " << std::endl;
unsigned int a=0,b=0;
for (size_t n=0; n < size/2; n++)
{
a = (a << 8) + data[n];
}
for (size_t n=size/2; n < size; n++)
{
b = (b << 8) + data[n];
}
//std::cout << "a " << a << " b " << b << std::endl;
if (dbpm.getPVIdx(bsPV[bsPVIdx]) <0)
{
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " bsPV index = " << bsPVIdx << std::endl;
std::cout << " pv from bs = " << bsPV[bsPVIdx] << std::endl;
std::cout << " Illegal index Value =" << dbpm.getPVIdx(bsPV[bsPVIdx]) << std::endl;
std::cout << " SKIPPING THIS BPM TIMESTAMP " << std::endl;
continue;
}
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].ts.secPastEpoch=a;
dbpm.pvd[dbpm.getPVIdx(bsPV[bsPVIdx])].ts.nsec=b;
}
//std::cout << "subMessage above: " << subMessage << std::endl;
++subMessage;
} //ifelse
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
//std::cout << "value of more " << more << std::endl;
if (!more)
{
dbpm.status=ICAFE_NORMAL;
// std::cout << "subMessage total: " << subMessage << std::endl;
// std::cout << " of which 0 size: " << nZeroSize << std::endl;
// std::cout << " Percentage good: " << (subMessage-nZeroSize-2)*100/(subMessage-2);
// puts ("//------------------------------------END-------------------------------------//");
break; // Last message part
}
} //while 1
inDumpFlag=0;
if (subMessage>2)
{
dbpm.status=ICAFE_NORMAL;
//std::cout << "subMessage total: " << subMessage << std::endl;
//std::cout << " of which 0 size: " << nZeroSize << std::endl;
//std::cout << " Percentage good: " << (subMessage-nZeroSize-2)*100/(subMessage-2);
//puts ("//------------------------------------END-------------------------------------//");
}
//std::cout << "end of loop " << std::endl;
return;
}
/**
* \brief Receives all message parts from zeromq socket
* \param bsd in/output: BSDataHolder object
* \return long ICAFE_NORMAL else ECAFE_error
*/
static void z_bsread (BSDataHolder &bsd)
{
#define __METHOD__ "z_bsread (BSDataHolder &bsd)"
unsigned int localDebug=0;
//We need to read dat a header every time since
//so that header data is caught after a bs restart(!)
dataHeaderReceived=false;
bsd.overallStatus=ICAFE_NORMAL;
//Data type index
if (!bsdtInsertFlag)
{
CAFE_BSHELPER::bsdtInsert();
bsdtInsertFlag=true;
}
bsreadContainer_set_by_name & name_index = CAFE_BSHELPER::bsdt.get<by_bsName> ();
bsreadContainer_set_by_name::iterator it_name;
unsigned int bsdtIndex;
if(localDebug)puts ("//START----------------------------------------//\n");
int subMessage=0;
void * socket = bsd.subscriber;
int64_t more =0; // Multipart detection
size_t more_size = sizeof (more);
//Keep a count of the number of MULTIpart messages with zero data
int nZeroSize=0;
int64_t ts, tnsec;
//Matching PV Index to BSChannel in bsd
int bsPVIdx=-1;
#if HAVE_JSON
Json::Value parsedFromString;
Json::Reader reader;
Json::FastWriter fastWriter;
#endif
bool parsingSuccessful;
bool mainHeaderReceived=false;
bool newHash=false;
using namespace boost::posix_time;
ptime timeStart(microsec_clock::local_time());
double timeElapsed=0;
unsigned int npoll =0;
//subMessage is incremeneted at end of multi-part message
//Process all parts of the message
//The Loop
while (1)
{
zmq_msg_t message;
zmq_msg_init (&message);
size_t size = zmq_msg_recv (&message, socket, 0);
//Continue goes to top of while
//Why not add submessage here
/*
++subMessage;
//std::cout << "SUBMESSAGE " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
//zmq_msg_close (&message);
if (!more)
{
zmq_msg_close (&message);
//if(localDebug)
puts ("//END----------------------------------------//");
break; // Last message part
}
continue;
*/
if(localDebug)printf ("[%03lu] \n", size);
if (size == -1)
{
std::cout << "z_bsread.h: Error is " << zmq_errno() << " " << zmq_strerror(zmq_errno()) << std::endl;
//Resource unavailable means that there is nothing to read now
zmq_msg_close (&message);
bsd.overallStatus=ICAFE_ERRNO_BASE+zmq_errno();
//std::cout << "z_bsread.h: Error is os " << bsd.overallStatus << std::endl;
for (size_t i=0; i < bsd.getNPV(); ++i)
{
if(bsd.getBSChannel(i).isBSEnabled())
{
bsd.pvd[i].set(0);
bsd.pvd[i].setStatus(bsd.overallStatus);
bsd.pvd[i].setAlarmStatus(-1);
bsd.pvd[i].setAlarmSeverity(-1);
bsd.pvd[i].ts.secPastEpoch=0;
bsd.pvd[i].ts.nsec =0;
bsd.pvd[i].setPulseID(0);
}
}
break;
}
else if (size == 0)
{
if(localDebug)std::cout << " Data of Zero SIZE for submessage " << subMessage << std::endl;
++nZeroSize;
//Avoid timestamp blob in count
if (nZeroSize%2==1)
{
++bsPVIdx; //Increment when data
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(0);
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ECAFE_BSREAD_MULTIPART_MESS_NODATA);
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setPulseID(bsd.getPulse_id());
}
else //Zero Timestamp
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.secPastEpoch=0;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.nsec =0;
}
if (bsd.overallStatus == ICAFE_NORMAL)
{
bsd.overallStatus=ECAFE_BSREAD_MULTIPART_MESS_NODATA;
}
}
else
{
// Dump the message as text or binary
if(localDebug)printf ("[%03lu] \n", size);
char *data = (char*)zmq_msg_data (&message);
//Check if it is text and is so if it is the main header or not
//First message is always the Main Header
if (!mainHeaderReceived)
{
//std::cout << "MAIN HEADER ================================================================================= " << std::endl;
parsingSuccessful=reader.parse(data, parsedFromString);
if (parsingSuccessful)
{
if (localDebug==1)
{
Json::StyledWriter styledWriter;
std::cout << "STYLED: --------------------------------" << std::endl;
std::cout << styledWriter.write(parsedFromString) << std::endl;
std::cout << "----------------------------------" << std::endl;
std::cout << "HEADER TYPE" << std::endl;
std::cout << parsedFromString["htype"].asString() << std::endl;
}
if (fastWriter.write(parsedFromString["htype"]).find("bsr_m-1.1") != std::string::npos)
{
if (localDebug)
{
std::cout << "hash " << parsedFromString["hash"].asString() << std::endl;
std::cout << "p id " << parsedFromString["pulse_id"].asUInt64() << std::endl;
std::cout << "g ts " << parsedFromString["global_timestamp"] << std::endl;
std::cout << "comp " << parsedFromString["dh_compression"].asString() << std::endl;
std::cout << "sec " << parsedFromString["global_timestamp"]["sec"].asUInt() << std::endl;
std::cout << "nsec " << parsedFromString["global_timestamp"]["ns"].asUInt() << std::endl;
}
//required
bsd.setHtype( parsedFromString["htype"].asString() );
if (bsd.getHash().compare(parsedFromString["hash"].asString()) !=0 )
{
bsd.setHash( parsedFromString["hash"].asString() );
newHash=true;
//std::cout << " NEW HASHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHh " << std::endl;
}
bsd.setPulse_id(parsedFromString["pulse_id"].asUInt64());
//std::cout << " Pulse ID " << bsd.getPulse_id() << std::endl;
//optional
if (!(parsedFromString["global_timestamp"]).empty())
{
bsd.setGlobal_timestamp(parsedFromString["global_timestamp"]["sec"].asUInt(),
parsedFromString["global_timestamp"]["ns"].asUInt());
}
//Check for Data Compression here!!
if (!(parsedFromString["dh_compression"].asString()).empty())
{
bsd.setDH_compression(parsedFromString["dh_compression"].asString() );
}
}
else if (fastWriter.write(parsedFromString["htype"]).find("bsr_reconnect-1.0") != std::string::npos)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "RECONNECT CONTROL MESSAGE " << std::endl;
std::cout << "HAS NEW SOURCE ADDRESS: " << fastWriter.write(parsedFromString["address"]) << std::endl;
bsd.globalBSZmqStream = fastWriter.write(parsedFromString["address"]).c_str();
dataHeaderReceived=false;
}
else if (fastWriter.write(parsedFromString["htype"]).find("bsr_stop-1.0") != std::string::npos)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "STOP CONTROL MESSAGE " << std::endl;
std::cout << "RECEIVED: " << fastWriter.write(parsedFromString["htype"]) << std::endl;
std::cout << "From BS Documentation: Message can be ignored as source will send from same address after startup" << std::endl;
dataHeaderReceived=false;
}
else
{
//Maybe this is not a header!??
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "HEADER IS UNRECOGNIZED: SOMETHING OTHER THAN DOCUMENETED! " << std::endl;
std::cout << parsedFromString["htype"] << std::endl;
std::cout << "EXIT PROGRAM " << std::endl;
dataHeaderReceived=false;
exit(1);
}
mainHeaderReceived=true;
} //parsing successful
else
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "PARSING FAILED !! " << std::endl;
std::cout << "CANNOT PROCEED WITHOUT MAIN HEADER INFORMATION! " << std::endl;
bsd.overallStatus=ECAFE_BSREAD_PARSEFAIL_MAINHEADER;
//Close message and break from loop
++subMessage;
std::cout << "SUBMESSAGE " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
break;
}
} //ifMainHeaderReceived
else if ( (mainHeaderReceived && !dataHeaderReceived) || (newHash && dataHeaderReceived))
{
//std::cout << "DATA HEADER ================================================================================= " << std::endl;
//std::cout << "mh = " << mainHeaderReceived << " dh " << dataHeaderReceived << " nh " << newHash <<std::endl;
newHash=false;
//Is it compressed or not??
if (bsd.getDH_compression().compare("bitshuffle_lz4")==0)
{
/*
int uncompressedSize=0; int blockSize=0;
unfoldPreBlob(data, uncompressedSize, blockSize);
//decompress
int src_size=uncompressedSize/sizeof(char);
//use bitshuffle terminology
char* regen_buffer = (char *) malloc(src_size*sizeof(char));
if (regen_buffer == NULL) {
bsd.overallStatus=ECAFE_BITSHUFF_ALLOCMEM;
return;
}
int sizeBlob = size-BSREAD_PREBLOB_BYTES;
if (sizeBlob <1) {sizeBlob=size;} //Should not happen
char * dataBlob = new char[sizeBlob];
int ij=0;
for (int char_nbr = BSREAD_PREBLOB_BYTES; char_nbr < size; char_nbr++) {
dataBlob[ij]=data[char_nbr];
++ij;
}
//ECAFE_BITSHUFF_DECOMPRESS
//ECAFE_BITSHUFF_REALLOCMEM
const int decompressed_size = bshuf_decompress_lz4((const char *) dataBlob, regen_buffer, uncompressedSize, sizeof(char), 0);
delete [] dataBlob;
if(localDebug) std::cout << " " << " decompressed_size " << decompressed_size << std::endl;
if (decompressed_size < 0) {bsd.overallStatus=ECAFE_BITSHUFF_DECOMPRESS; return;}
else if (decompressed_size == 0){
if(localDebug)std::cout << "0 decompressed size! Should have a positive number for successful deompression?" <<std::endl;
}
else {
if(localDebug)std::cout << "We successfully decompressed some data!" <<std::endl;
}
// Not only does a positive return value mean success,
// value returned == number of bytes regenerated from compressed_data stream.
// We can use this to realloc() *regen_buffer to free up memory
//Do this to remove spurious trailing characters
regen_buffer = (char *)realloc(regen_buffer, decompressed_size);
if (regen_buffer == NULL) {
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "WARNING with status code: " << ECAFE_BITSHUFF_REALLOCMEM << std::endl;
std::cout << "Failed to reallocate memory for compressed data. Not a show stopper" << std::endl;
}
*/
char * regen_buffer = NULL;
int status=CAFE_BSHELPER::bitshuffleDecompress(data, regen_buffer, size, sizeof(char));
if (status==ICAFE_NORMAL)
{
parsingSuccessful=reader.parse((const char *) regen_buffer, parsedFromString);
}
else
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "Data Header Compression Algorithmn: " << bsd.getDH_compression() << " FAILED! " << std:: endl;
std::cout << "With Status Code: " << status << std:: endl;
std::cout << "Discarding remainder of multi-part message! " << status << std:: endl;
bsd.overallStatus=status;
free(regen_buffer);
//Close message and break from loop
++subMessage;
std::cout << "SUBMESSAGE " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
break;
}
} //bitshuffle_lz4
else if (bsd.getDH_compression()=="none")
{
parsingSuccessful=reader.parse(data, parsedFromString);
}
else
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "Data Header Compression Algorithmn: " << bsd.getDH_compression() << " not recognized " << std:: endl;
std::cout << "Assuming no compression for data header " << std::endl;
parsingSuccessful=reader.parse(data, parsedFromString);
}
if (parsingSuccessful)
{
if (localDebug==1)
{
Json::StyledWriter styledWriter;
std::cout << "STYLED: --------------------------------//" << std::endl;
std::cout << styledWriter.write(parsedFromString) << std::endl;
std::cout << "----------------------------------//" << std::endl;
std::cout << "HEADER TYPE" << std::endl;
std::cout << parsedFromString["htype"].asString() << std::endl;
}
if (fastWriter.write(parsedFromString["htype"]).find("bsr_d-1.1") != std::string::npos)
{
if (localDebug) std::cout << "chan " << parsedFromString["channels"] << std::endl;
bsrdV.clear();
bsrdV.reserve(parsedFromString["channels"].size());
for (Json::Value::ArrayIndex i=0; i < parsedFromString["channels"].size(); ++ i)
{
//New line is spit out after parsedFromString[]
if (localDebug) {
std::cout << "name:" << parsedFromString["channels"][i]["name"].asString() << " //"<< std::endl;
std::cout << "enco:" << parsedFromString["channels"][i]["encoding"].asString() << " //"<< std::endl;
std::cout << "type:" << parsedFromString["channels"][i]["type"].asString() << " //" <<std::endl;
std::cout << "shape isArray " << parsedFromString["channels"][i]["shape"].isArray() << std::endl;
}
Json::Value iv;
Json::Value::ArrayIndex inpv=0;
//std::cout << "Value as int " << parsedFromString["channels"][i]["shape"][inpv].asDouble() << std::endl;
//std::cout << "SIZE " << parsedFromString["channels"][i]["shape"].size() << std::endl;
//std::cout << "COMP " << parsedFromString["channels"][i]["compression"].asString() << std::endl;
std::vector<int> lshape;
for (Json::Value::ArrayIndex il=0; il < parsedFromString["channels"][i]["shape"].size(); ++ il)
{
//std::cout << "Value as int// " << parsedFromString["channels"][i]["shape"][il].asInt() << std::endl;
lshape.push_back(std::max(1, parsedFromString["channels"][i]["shape"][il].asInt()) );
}
std::string NAME = parsedFromString["channels"][i]["name"].asString();
std::string ENCO = parsedFromString["channels"][i]["encoding"].asString(); // big default=little
if (ENCO.empty())
{
ENCO="little"; //default
}
std::string TYPE = parsedFromString["channels"][i]["type"].asString();
if (TYPE.empty())
{
TYPE="float64"; //default
}
std::string COMP = parsedFromString["channels"][i]["compression"].asString();
if (COMP.empty())
{
COMP="none"; //default
}
//std::cout << "TYPE " << TYPE << std::endl;
//std::cout << "TYPE: SIZE= " << TYPE.size() << std::endl;
//std::cout << TYPE.substr(0,TYPE.size()) << std::endl;
BSChannel bsc(NAME);
//Need to find correct entry
//int idx= bsd.getIdxFromName(NAME);
//BSChannel bsc= bsd.getBSChannel(idx); // instead of i
bsc.setShape(lshape);
//Need to fill in Channel Name, encoding and Type...
bsc.setType(TYPE);
bsc.setEncoding(ENCO);
bsc.setCompression(COMP);
bsrdV.push_back(bsc);
if ( bsd.getPVIdx(NAME) < 0 )
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " pv requested = " << NAME << std::endl;
std::cout << " Illegal index Value =" << bsd.getPVIdx(NAME) << std::endl;
std::cout << " NO FURTHER ACTION REQUIRES " << std::endl;
}
else
{
//Ensure there is sufficent space to hold returned data,, e.g. may be an array
bsd.pvd[ bsd.getPVIdx(NAME)].setNelem(bsc.getNelem());
}
int idx= bsd.getIdxFromName(NAME);
bsd.setBSChannel(idx, bsc);
//std::cout << "The No elements are " << bsd.getBSChannel(i).getNelem() << std::endl;
//There may be more channels coming through this stream than requested
//therefore first fill a local vector<BSChannel>
//and then map this to that in BSDataHolder!
//It is conceivable that they do not match exactly
} // for JsonValue
if (localDebug)
{
for (int i=0; i< bsrdV.size(); ++i)
{
std::cout << i << " " << bsrdV[i].getName() << " " << bsd.getPVIdx( bsrdV[i].getName()) << std::endl;
}
std::cout << "No of channels " << bsrdV.size() << " " << parsedFromString["channels"].size() << std::endl;
std::cout << "----------------------------------" << std::endl;
}
} //bsr_d
dataHeaderReceived=true; //set FLAG
} //parsingsuccessful
else
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "PARSING FAILED !! " << std::endl;
std::cout << "CANNOT PROCEED WITHOUT DATA HEADER INFORMATION! " << std::endl;
bsd.overallStatus=ECAFE_BSREAD_PARSEFAIL_DATAHEADER;
//Close message and break from loop
++subMessage;
std::cout << "SUBMESSAGE " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
break;
}
} //dataHeader
else if (subMessage==1)
{
//header again so we skip this as hash is the same!
//Do nothing
if(localDebug) std::cout << "HEADER INFO ALREADY REGISTERED " << std::endl;
}
else if (subMessage%2 ==1)
{
//std::cout << "timestamp " << std::endl;
if (bsd.getPVIdx(bsrdV[bsPVIdx].getName()) <0)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " bsPV index = " << bsPVIdx << std::endl;
std::cout << " pv from bs = " << bsrdV[bsPVIdx].getName() << std::endl;
std::cout << " Expect illegal BSDataHolder index value =" << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
std::cout << " SKIPPING THIS SUBMESSAGE... " << subMessage << std::endl;
}
else
{
ts=0;
tnsec=0;
//long test=1518777482;
//printBits(sizeof(test), &test);
//printBits(size, &data);
//size==16
/*bits come in this order for 64bit floats: SARFE10-PSSS059:SPECTRUM_CENTER
01110101
00001011
11000110
01011111
00000000
00000000
00000000
00000000
whereas for other datatypes
00000000
00000000
00000000
00000000
01011111
11000110
00001011
01110101
ts
0000000000000000000000000000000001011111110001100101100000001111
ns
0000000000000000000000000000000000000010011100011001110001000000
So First need to verify which format is coming through:
If First type - then reorder!
*/
bool littleEndian = false;
int den = 4; //must be 4
for (size_t n=0; n < 16/den; n++) {
char jh = data[n];
//printBits(sizeof(jh), &jh);
if (hasMinOneBitSet(sizeof(jh), &jh) )
{
littleEndian = true ;
break;
}
}
if (localDebug) {
std::cout << "--------------------------------" << std::endl;
for (size_t n=0; n < size; n++) {
char jh = data[n];
printBits(sizeof(jh), &jh);
}
}
//std::cout << "nano_seconds" << std::endl;
//for (size_t n=size/2; n < size; n++) {
//char jh = data[n];
//printBits(sizeof(jh), &jh);
//}
/*
if (littleEndian ) {
//little endian
int in = 0;
for (unsigned int n = (16-1); n >= 0; n--) {
std::cout << n << " // " << in << std::endl;
data[in] = dataOriginal[n];
++in;
}
std::cout << "LITTE ENDIAN TS = " << ts << std::endl;
}
*/
if (littleEndian) {
//std::cout << " TS littleEndian " << std::endl;
//little endian
//for (size_t n = size; n >= 0; n--) {
// iv = (iv << 8) + data[n];
//} sum += bytes[0] | (bytes[1]<<8) | (bytes[2]<<16) | (bytes[3]<<24);
ts = 0xFF & data[0] | (0xFFFF & data[1]<<8) | (0xFFFFFF & data[2]<<16) | (0xFFFFFFFF & data[3]<<24);
/*
short inn=size/2-1;
for (; inn>=0; inn--)
{
ts = (ts<<8) + data[inn];
}
*/
}
else {
//std::cout << " TS bigEndian " << std::endl;
for (size_t n=0; n < size/2; n++)
{
char jh = data[n];
//printBits(sizeof(jh), &jh);
if ( (n+1) < size/2 )
{
if ( data[n+1] & 0x80)
{
data[n]=data[n]+1;
}
}
//if (n==4) data[n]=data[n]+1; //Just what is the compiler playing at when shifting bits!??
//if (n==5) data[n]=data[n]+1; //ditto
ts = (ts<<8) + data[n];
}
}
//printBits(sizeof(ts), &ts);
/*
if (littleEndian) {
uint64_t ts = __builtin_bswap64 ((uint64_t) ts);
std::cout << "ts after bitswap " << ts << " " << (unsigned long) ts << std::endl;
}
*/
/*
uint32_t num = (unsigned int) ts;
uint32_t b0,b1,b2,b3;
uint32_t res;
b0 = (num & 0x000000ff) << 24u;
b1 = (num & 0x0000ff00) << 8u;
b2 = (num & 0x00ff0000) >> 8u;
b3 = (num & 0xff000000) >> 24u;
res = b0 | b1 | b2 | b3;
printf("num %d, RES %d= \n",num, res);
printf("RES = %lX \n", res);
printBits(sizeof(ts), &ts);
*/
if(localDebug)std::cout << "ts original way " << ts << "///" << std::endl;
//unsigned short idx=0;
//int ts_int=( ( (data[idx+7]<<0) & 0x7f) | ((data[idx+7]<<0) & 0x80) | (data[idx+6]<<8) | (data[idx+5]<<16) | (data[idx+4]<<24)
//int ts_int=( (data[idx+7]<<0) | (data[idx+6]<<8) | (data[idx+5]<<16) | (data[idx+4]<<24)
// | ((long long) data[idx+3]<<32) | ((long long) data[idx+2]<<40) | ((long long) data[idx+1]<<48) | ((long long) data[idx]<<56) ) ;
//std::cout << "ts_int= " << ts_int << std:: endl;
//idx=4;
//long long ts_long=( ( (data[idx+3]<<0) ) | ((data[idx+3]<<0) & 0x80) | ( (data[idx+2]<<8) ) | ((data[idx+2]<<8) & 0x80) |
// ( (data[idx+1]<<16) ) | ((data[idx+1]<<16) & 0x80) | ( (data[idx]<<24) ) | ((data[idx]<<24) & 0x80) );
//std::cout << "ts_long= " << ts_long << std:: endl;
if (littleEndian) {
//std::cout << " nsec littleEndian " << std::endl;
//little endian
//for (size_t n = size; n >= 0; n--) {
// iv = (iv << 8) + data[n];
//}
/*
short inn=size-1;
for (; inn>=size/2; inn--)
{
tnsec = (tnsec) + data[inn];
}
*/
tnsec = 0xFF & data[8] | (0xFFFF & data[9]<<8) | (0xFFFFFF & data[10]<<16) | (0xFFFFFFFF & data[11]<<24);
}
else {
//std::cout << " nsec bigEndian " << std::endl;
for (size_t n=size/2; n < size; n++)
{
char jh = data[n];
//printBits(sizeof(jh), &jh);
//if (n==12) data[n]=data[n]+1;
if ( (n+1) < size )
{
if ( data[n+1] & 0x80 )
{
data[n]=data[n]+1;
//std::cout << "------------" << std::endl;
//jh = data[n];
//printBits(sizeof(jh), &jh);
//std::cout << "------------" << std::endl;
}
}
tnsec = (tnsec<<8) + data[n];
}
}
//printBits(sizeof(tnsec), &tnsec);
if(localDebug)std::cout << "tns original way " << tnsec << std::endl;
//std::cout << "TESTING...................................................... " << std::endl;
//idx=8;
//tnsec=( ( (data[idx+7]<<0) & 0x7f) | ((data[idx+7]<<0) & 0x80) | (data[idx+6]<<8) | (data[idx+5]<<16) | (data[idx+4]<<24)
// | ((long long) data[idx+3]<<32) | ((long long) data[idx+2]<<40) | ((long long) data[idx+1]<<48) | ((long long) data[idx]<<56) ) ;
if(localDebug)std::cout << "ts " << ts << " tnsec " << tnsec << std::endl;
//Add to bsd
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.secPastEpoch=ts;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].ts.nsec =tnsec;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setBeamEventNo(bsd.getPulse_id());
}
}
else
{
//std::cout << " NOW COMES THE DATA " << std::endl;
//if (localDebug) {
// for (int i=0; i< bsrdV.size(); ++i) {
// std::cout << i << " " << bsrdV[i].getName() << " " << bsd.getPVIdx(bsrdV[i].getName()) << std::endl;
// }
// std::cout << "No of channels " << bsrdV.size() << std::endl;
// std::cout << "-----------------------" << std::endl;
//}
//Now comes the data
++bsPVIdx;
if (bsd.getPVIdx(bsrdV[bsPVIdx].getName()) <0)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << " WARNING--> THIS CHANNEL WAS NOT REQUESTED IN CONFIGURATION FILE " << std::endl;
std::cout << " bsPV index = " << bsPVIdx << std::endl;
std::cout << " pv from bs = " << bsrdV[bsPVIdx].getName() << std::endl;
std::cout << " Expect illegal BSDataHolder index value =" << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
std::cout << " SKIPPING THIS SUBMESSAGE... " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
++subMessage;
if (!more)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << " No more submessages! " << std::endl;
std::cout << " This message implies that pv was not accompanied by a timestamp!! Something not quite right. " << std::endl;
break; // Last message part
}
continue; //top of while
}
if(localDebug)
{
std::cout << "index= " << bsPVIdx << std::endl;
std::cout << "Name = " << bsrdV[bsPVIdx].getName() << " " << bsrdV[bsPVIdx].getType() << std::endl;
std::cout << "IDx = " << bsd.getPVIdx(bsrdV[bsPVIdx].getName()) << std::endl;
}
//Find enum of data type
it_name = name_index.find(bsrdV[bsPVIdx].getType());
if ( (it_name) != name_index.end() )
{
bsdtIndex=(*it_name).by_bsID;
}
//Do we need to compress the data?
bool compressedFlag=false;
BSChannel bsc=bsd.getBSChannel(bsrdV[bsPVIdx].getName());
char * regen_buffer;
//std::cout << "CHANNEL==" << bsrdV[bsPVIdx].getName() << std::endl;
//std::cout << "COMPRESSION ============" << bsc.getCompression() << std::endl;
if ( bsc.getCompression().compare("bitshuffle_lz4")==0)
{
//Decompress
int status=CAFE_BSHELPER::bitshuffleDecompress(data, regen_buffer, size, getByteSize(bsdtIndex));
if (status==ICAFE_NORMAL)
{
parsingSuccessful=reader.parse((const char *) regen_buffer, parsedFromString);
//std::cout << "parsingSuccessful====" << parsingSuccessful << std::endl;
}
else
{
free(regen_buffer);
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(status);
bsd.overallStatus=status;
//skip message
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
++subMessage;
if (!more)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << " No more submessages! " << std::endl;
std::cout << " This message implies that pv was not accompanied by a timestamp!! Something not quite right. " << std::endl;
break; // Last message part
}
continue; //top of while
}
compressedFlag=true;
}
else
{
//No compression required
regen_buffer = data; //new char[size];
//regen_buffer = new char[size];
//for (int i=0; i<size; ++i) {
// std::cout << i << " " << (unsigned short) data[i] << std::endl;
// }
//std::cout << size << " /// " << bsrdV[bsPVIdx].getNelem()<< std::endl;
//std::cout << "regen_buffer" << std::endl;
//std::cout << (int *) regen_buffer << std::endl;
}
//std::cout << "Compression//0//--> " << bsd.getBSChannel(0).getCompression() << std::endl;
//std::cout << "Compression//1//--> " << bsd.getBSChannel(1).getCompression() << std::endl;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ICAFE_NORMAL);
//char,bool=1, short=2, int =4, long,long long=8
switch (bsdtIndex)
{
case CAFE_BSHELPER::BS_STRING:
{
if (bsrdV[bsPVIdx].getNelem()>1)
{
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "An array of strings has not been envisioned for bsREAD" << std::endl;
std::cout << "Assuming one string element of max size PVNAME_SIZE" << std::endl;
}
if (size > MAX_STRING_SIZE)
{
std::cout << "Datatype is string. Size " << size << " too large; trimming to MAX_STRING_SIZE" << std::endl;
}
std::string str="";;
for (int i=0; i<std::min((int)size, (int)MAX_STRING_SIZE); ++i)
{
if (data[i] != '\0')
{
str.append(1, data[i] );
//std::cout << str << " [" << i << "] " << std::endl;
}
}
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(str);
if(localDebug)std::cout << " readback of string value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].str << std::endl;
break;
}
case CAFE_BSHELPER::BS_FLOAT32:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetFloat.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of float scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].f << std::endl;
break;
}
//std::vector<float> Vbs( bsrdV[bsPVIdx].getNelem()); //Must allocate memory here
//bsetFloatV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
//for (int ik=0; ik<Vbs.size(); ++ik) std::cout << Vbs[ik] << " [" << ik << "] " << std::endl;
//std::cout << "NELEM " << bsrdV[bsPVIdx].getNelem() << " Encoding" << bsrdV[bsPVIdx].getEncoding() << std::endl;
float * ff = bsetFloat.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
//std::cout << "float " << ff[0] << std::endl;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ff);
if(localDebug)std::cout << " readback of float value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].f << std::endl;
delete [] ff;
break;
}
case CAFE_BSHELPER::BS_FLOAT64:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetDouble.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if (localDebug) std::cout << " readback of double scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].d << std::endl;
break;
}
//std::vector<double> Vbs( bsrdV[bsPVIdx].getNelem()); //Must allocate memory here
//bsetDoubleV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
double * dd = bsetDouble.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(dd);
if (localDebug) std::cout << " readback of double value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].d << std::endl;
delete [] dd;
break;
}
case CAFE_BSHELPER::BS_INT64:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetLongLong.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of int64 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong() << std::endl;
break;
}
//std::vector<long long> Vbs;
//bsetLongLongV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
long long * ll=bsetLongLong.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ll);
if(localDebug)
{
std::cout << bsrdV[bsPVIdx].getName() << std::endl;
std::cout << " readback of int64 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong() << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsLongLong(i) << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] ll;
break;
}
case CAFE_BSHELPER::BS_UINT64:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetULongLong.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of uint64 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong() << std::endl;
break;
}
//std::vector<unsigned long long> Vbs;
//bsetULongLongV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
unsigned long long * ull=bsetULongLong.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ull);
if(localDebug)
{
std::cout << " readback of uint64 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong() << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULongLong() << " [" << i << "] " ;
//}
std::cout << std::endl;
}
delete [] ull;
break;
}
case CAFE_BSHELPER::BS_INT32:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetInt.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of int32 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].l << std::endl;
break;
}
//std::vector<int> Vbs;
//bsetIntV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
int * l=bsetInt.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(l);
if(localDebug)
{
std::cout << bsrdV[bsPVIdx].getName() << std::endl;
std::cout << " readback of int32 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].l << std::endl;
for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i)
{
std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].l << " [" << i << "] " ;
}
std::cout << std::endl;
}
delete [] l;
break;
}
case CAFE_BSHELPER::BS_UINT32:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUInt.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of uint32 scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong() << std::endl;
break;
}
//std::vector<unsigned int> Vbs;
//bsetUIntV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
unsigned int * ul=bsetUInt.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ul);
if(localDebug)
{
std::cout << bsrdV[bsPVIdx].getName() << std::endl;
std::cout << " readback of uint32 value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong() << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getAsULong(i) << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] ul;
break;
}
case CAFE_BSHELPER::BS_INT16:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetShort.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of short scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].s << std::endl;
break;
}
//std::vector<short> Vbs;
//bsetShortV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
short * s=bsetShort.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
//std::cout << "SHORT VALUE " << s[0] << std::endl;
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(s);
if(localDebug)
{
std::cout << " readback of short value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].s << std::endl;
//std::cout << " nelements in pvdata " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].getNelem() << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].s << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] s;
break;
}
case CAFE_BSHELPER::BS_UINT16:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUShort.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of ushort scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].us << std::endl;
break;
}
//std::vector<unsigned short> Vbs;
//bsetUShortV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
unsigned short * us=bsetUShort.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(us);
if(localDebug)
{
std::cout << " readback of ushort value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].us << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].us << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] us;
break;
}
case CAFE_BSHELPER::BS_INT8:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetChar.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of char scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
break;
}
//std::vector<char> Vbs;
//bsetCharV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
char * ch=bsetChar.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(ch); //set(ch);
if(localDebug)
{
std::cout << " readback of char value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].ch << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] ch;
break;
}
case CAFE_BSHELPER::BS_UINT8:
case CAFE_BSHELPER::BS_BOOL:
{
if ( bsrdV[bsPVIdx].getNelem()==1)
{
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set( bsetUChar.unfoldScalar(regen_buffer, bsrdV[bsPVIdx].getEncoding()) );
if(localDebug)std::cout << " readback of unsigned char scalar that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
break;
}
//std::vector<unsigned char> Vbs;
//bsetUCharV.unfold(regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding(),Vbs);
unsigned char * uch=bsetUChar.unfold (regen_buffer, bsrdV[bsPVIdx].getNelem(), bsrdV[bsPVIdx].getEncoding());
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].set(uch);
if(localDebug)
{
std::cout << " readback of unsigned char value that was set = " << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[0].ch << std::endl;
//for (int i=0; i < bsrdV[bsPVIdx].getNelem(); ++i) {
// std::cout << bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].val[i].ch << " [" << i << "] " ;
//}
//std::cout << std::endl;
}
delete [] uch;
break;
}
default:
std::cout << __FILE__ << "//" << __LINE__ << "//" << __METHOD__ << std::endl;
std::cout << "ECAFE_INVALID_SWITCH_CASE" << std::endl;
print_out_by<by_bsID>(bsdt);
bsd.pvd[bsd.getPVIdx(bsrdV[bsPVIdx].getName())].setStatus(ECAFE_INVALID_SWITCH_CASE);
}
if (compressedFlag)
{
free(regen_buffer);
}
}
} //top else
// THIS HAS TO BE COPIED TO BEFORE CONTINUE!
//Continue goes to top of while
//Why not add submessage here
++subMessage;
//std::cout << "SUBMESSAGE " << subMessage << std::endl;
more=0; // Multipart detection
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
zmq_msg_close (&message);
if (!more)
{
if(localDebug)puts ("//END----------------------------------------//");
break; // Last message part
}
} //while 1
ptime timeEnd(microsec_clock::local_time());
time_duration duration(timeEnd-timeStart);
timeElapsed= (double) duration.total_microseconds()/1000000.0;
//++npoll;
//timeAvg = (timeAvg + timeElapsed)/npoll;
//std::cout << __METHOD__ << __LINE__ << " TimeElapsed " << timeElapsed << " " << std::endl;
//std::cout << " Pulse_id: " << bsd.getPulse_id() <<std::endl;
//First two messages are main header and data header
if (subMessage>1)
{
//bsd.overallStatus=ICAFE_NORMAL; // already at top; will be overwritten in case of error
bsd.setNChannels((subMessage-2)/2);
bsd.setNNullData(nZeroSize/2);
bsd.setPGoodData((subMessage-nZeroSize-2)*100/(std::max((subMessage-2),1)) );
if(localDebug)
{
std::cout << " No of submessages: " << subMessage << std::endl;
std::cout << " of which zero size have: " << nZeroSize << std::endl;
std::cout << " No of channels: " << bsd.getNChannels()<< std::endl;
std::cout << " of which zero size have: " << bsd.getNNullData() << std::endl;
std::cout << " Percentage good: " << bsd.getPGoodData() <<std::endl;
std::cout << " Pulse_id: " << bsd.getPulse_id() <<std::endl;
puts ("//------------------------------------END-------------------------------------//");
}
}
//std::cout << "z_bsread.h overallstatus " << bsd.overallStatus << std::endl;
return;
#undef __METHOD__
}
#endif // ZBSREAD_H