exposing receiver thread ids to client (#102)

* exposing receiver thread ids to client

Co-authored-by: Erik Frojdh <erik.frojdh@gmail.com>
This commit is contained in:
Dhanya Thattil
2020-06-09 16:18:37 +02:00
committed by GitHub
parent 2a2bb5f63a
commit f5160b0978
48 changed files with 1151 additions and 580 deletions

View File

@ -36,6 +36,7 @@ ClientInterface::ClientInterface(int portNumber)
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2),
server(portNumber) {
functionTable();
parentThreadId = syscall(SYS_gettid);
// start up tcp thread
tcpThread =
sls::make_unique<std::thread>(&ClientInterface::startTCPServer, this);
@ -71,8 +72,8 @@ void ClientInterface::registerCallBackRawDataModifyReady(
}
void ClientInterface::startTCPServer() {
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid)
<< "]";
tcpThreadId = syscall(SYS_gettid);
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << tcpThreadId << "]";
LOG(logINFO) << "SLS Receiver starting TCP Server on port " << portNumber
<< '\n';
// server = sls::make_unique<sls::ServerSocket>(portNumber);
@ -102,8 +103,7 @@ void ClientInterface::startTCPServer() {
if (receiver) {
receiver->shutDownUDPSockets();
}
LOG(logINFOBLUE) << "Exiting [ TCP server Tid: " << syscall(SYS_gettid)
<< "]";
LOG(logINFOBLUE) << "Exiting [ TCP server Tid: " << tcpThreadId << "]";
}
// clang-format off
@ -199,6 +199,7 @@ int ClientInterface::functionTable(){
flist[F_GET_RECEIVER_PROGRESS] = &ClientInterface::get_progress;
flist[F_SET_RECEIVER_NUM_GATES] = &ClientInterface::set_num_gates;
flist[F_SET_RECEIVER_GATE_DELAY] = &ClientInterface::set_gate_delay;
flist[F_GET_RECEIVER_THREAD_IDS] = &ClientInterface::get_thread_ids;
for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) {
LOG(logDEBUG1) << "function fnum: " << i << " (" <<
@ -564,6 +565,8 @@ void ClientInterface::setDetectorType(detectorType arg) {
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack,
pRawDataReady);
impl()->setThreadIds(parentThreadId, tcpThreadId);
}
int ClientInterface::set_roi(Interface &socket) {
@ -1746,4 +1749,10 @@ int ClientInterface::set_gate_delay(Interface &socket) {
std::to_string(gateIndex));
}
return socket.Send(OK);
}
int ClientInterface::get_thread_ids(Interface &socket) {
auto retval = impl()->getThreadIds();
LOG(logDEBUG1) << "thread ids retval: " << sls::ToString(retval);
return socket.sendResult(retval);
}

View File

@ -155,6 +155,7 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_progress(sls::ServerInterface &socket);
int set_num_gates(sls::ServerInterface &socket);
int set_gate_delay(sls::ServerInterface &socket);
int get_thread_ids(sls::ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {
@ -179,4 +180,7 @@ class ClientInterface : private virtual slsDetectorDefs {
void (*rawDataModifyReadyCallBack)(char *, char *, uint32_t &,
void *) = nullptr;
void *pRawDataReady{nullptr};
pid_t parentThreadId{0};
pid_t tcpThreadId{0};
};

View File

@ -31,48 +31,29 @@ HDF5File::HDF5File(int ind, uint32_t *maxf, int *nd, std::string *fname,
parameterNames.clear();
parameterDataTypes.clear();
parameterNames.push_back("frame number");
parameterDataTypes.push_back(PredType::STD_U64LE);
parameterNames.push_back("exp length or sub exposure time");
parameterDataTypes.push_back(PredType::STD_U32LE);
parameterNames.push_back("packets caught");
parameterDataTypes.push_back(PredType::STD_U32LE);
parameterNames.push_back("bunch id");
parameterDataTypes.push_back(PredType::STD_U64LE);
parameterNames.push_back("timestamp");
parameterDataTypes.push_back(PredType::STD_U64LE);
parameterNames.push_back("mod id");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("row");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("column");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("reserved");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("debug");
parameterDataTypes.push_back(PredType::STD_U32LE);
parameterNames.push_back("round robin number");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("detector type");
parameterDataTypes.push_back(PredType::STD_U8LE);
parameterNames.push_back("detector header version");
parameterDataTypes.push_back(PredType::STD_U8LE);
parameterNames.push_back("packets caught bit mask");
parameterNames = std::vector<std::string>{
"frame number",
"exp length or sub exposure time",
"packets caught",
"bunch id",
"timestamp",
"mod id",
"row",
"column",
"reserved",
"debug",
"round robin number",
"detector type",
"detector header version",
"packets caught bit mask",
};
StrType strdatatype(PredType::C_S1, sizeof(bitset_storage));
parameterDataTypes.push_back(strdatatype);
parameterDataTypes = std::vector<DataType>{
PredType::STD_U64LE, PredType::STD_U32LE, PredType::STD_U32LE,
PredType::STD_U64LE, PredType::STD_U64LE, PredType::STD_U16LE,
PredType::STD_U16LE, PredType::STD_U16LE, PredType::STD_U16LE,
PredType::STD_U32LE, PredType::STD_U16LE, PredType::STD_U8LE,
PredType::STD_U8LE, strdatatype};
}
HDF5File::~HDF5File() { CloseAllFiles(); }
@ -461,9 +442,9 @@ void HDF5File::CreateDataFile() {
paralist.setChunk(1, chunkpara_dims);
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
DataSet *ds = new DataSet(
filefd->createDataSet(parameterNames[i], parameterDataTypes[i],
*dataspace_para, paralist));
DataSet *ds = new DataSet(filefd->createDataSet(
parameterNames[i].c_str(), parameterDataTypes[i],
*dataspace_para, paralist));
dataset_para.push_back(ds);
}
} catch (const Exception &error) {
@ -1011,7 +992,7 @@ void HDF5File::CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf) {
for (unsigned int k = 0; k < parameterNames.size(); ++k) {
if (H5Pset_virtual(dcpl_para[k], vdsDataspace_para,
relative_srcFileName.c_str(),
parameterNames[k],
parameterNames[k].c_str(),
srcDataspace_para) < 0) {
throw sls::RuntimeError(
"Could not set mapping for paramter " +
@ -1043,7 +1024,7 @@ void HDF5File::CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf) {
// virtual parameter dataset
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
hid_t vdsdataset_para = H5Dcreate2(
virtualfd, parameterNames[i],
virtualfd, parameterNames[i].c_str(),
GetDataTypeinC(parameterDataTypes[i]), vdsDataspace_para,
H5P_DEFAULT, dcpl_para[i], H5P_DEFAULT);
if (vdsdataset_para < 0)
@ -1134,9 +1115,8 @@ void HDF5File::LinkVirtualInMaster(std::string fname, std::string dsetname) {
(std::string(parameterNames[i])).c_str());
if (H5Lcreate_external(relative_virtualfname.c_str(),
(std::string(parameterNames[i])).c_str(),
mfd, linkname, H5P_DEFAULT,
H5P_DEFAULT) < 0) {
parameterNames[i].c_str(), mfd, linkname,
H5P_DEFAULT, H5P_DEFAULT) < 0) {
H5Fclose(mfd);
mfd = 0;
throw sls::RuntimeError(

View File

@ -84,7 +84,7 @@ class HDF5File : private virtual slsDetectorDefs, public File {
uint64_t numActualPacketsInFile;
int numFilesinAcquisition;
std::vector<const char *> parameterNames;
std::vector<std::string> parameterNames;
std::vector<DataType> parameterDataTypes;
DataSpace *dataspace_para;
std::vector<DataSet *> dataset_para;

View File

@ -445,6 +445,37 @@ void Implementation::setFramePaddingEnable(const bool i) {
LOG(logINFO) << "Frame Padding: " << framePadding;
}
void Implementation::setThreadIds(const pid_t parentTid, const pid_t tcpTid) {
parentThreadId = parentTid;
tcpThreadId = tcpTid;
}
std::array<pid_t, NUM_RX_THREAD_IDS> Implementation::getThreadIds() const {
LOG(logDEBUG3) << __SHORT_AT__ << " called";
std::array<pid_t, NUM_RX_THREAD_IDS> retval{};
int id = 0;
retval[id++] = parentThreadId;
retval[id++] = tcpThreadId;
retval[id++] = listener[0]->GetThreadId();
retval[id++] = dataProcessor[0]->GetThreadId();
if (dataStreamEnable) {
retval[id++] = dataStreamer[0]->GetThreadId();
} else {
retval[id++] = 0;
}
if (numThreads == 2) {
retval[id++] = listener[1]->GetThreadId();
retval[id++] = dataProcessor[1]->GetThreadId();
if (dataStreamEnable) {
retval[id++] = dataStreamer[1]->GetThreadId();
} else {
retval[id++] = 0;
}
}
return retval;
}
/**************************************************
* *
* File Parameters *

View File

@ -42,6 +42,8 @@ class Implementation : private virtual slsDetectorDefs {
void setFrameDiscardPolicy(const frameDiscardPolicy i);
bool getFramePaddingEnable() const;
void setFramePaddingEnable(const bool i);
void setThreadIds(const pid_t parentTid, const pid_t tcpTid);
std::array<pid_t, NUM_RX_THREAD_IDS> getThreadIds() const;
/**************************************************
* *
@ -269,6 +271,8 @@ class Implementation : private virtual slsDetectorDefs {
uint32_t fifoDepth;
frameDiscardPolicy frameDiscardMode;
bool framePadding;
pid_t parentThreadId;
pid_t tcpThreadId;
// file parameters
fileFormat fileFormatType;

View File

@ -28,6 +28,8 @@ ThreadObject::~ThreadObject() {
sem_destroy(&semaphore);
}
pid_t ThreadObject::GetThreadId() const { return threadId; }
bool ThreadObject::IsRunning() const { return runningFlag; }
void ThreadObject::StartRunning() { runningFlag = true; }
@ -35,8 +37,9 @@ void ThreadObject::StartRunning() { runningFlag = true; }
void ThreadObject::StopRunning() { runningFlag = false; }
void ThreadObject::RunningThread() {
threadId = syscall(SYS_gettid);
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index
<< ", Tid: " << syscall(SYS_gettid) << "]";
<< ", Tid: " << threadId << "]";
while (!killThread) {
while (IsRunning()) {
ThreadExecution();
@ -45,7 +48,8 @@ void ThreadObject::RunningThread() {
sem_wait(&semaphore);
}
LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index
<< ", Tid: " << syscall(SYS_gettid) << "]";
<< ", Tid: " << threadId << "]";
threadId = 0;
}
void ThreadObject::Continue() { sem_post(&semaphore); }

View File

@ -25,10 +25,12 @@ class ThreadObject : private virtual slsDetectorDefs {
std::thread threadObject;
sem_t semaphore;
const std::string type;
pid_t threadId{0};
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
pid_t GetThreadId() const;
bool IsRunning() const;
void StartRunning();
void StopRunning();