adds a POC preset based count

This commit is contained in:
2025-10-31 13:23:55 +01:00
parent b9e5f40c21
commit 1e853487aa
6 changed files with 460 additions and 62 deletions

View File

@@ -90,16 +90,44 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
char pv_name_buffer[100];
P_Counts = new int[this->num_channels];
asynStatus status;
asynStatus status = asynSuccess;
status = (asynStatus)(status | createParam(P_StatusString, asynParamInt32,
&P_Status));
status = (asynStatus)(status | setIntegerParam(P_Status, STATUS_IDLE));
status = (asynStatus)(status |
createParam(P_ResetString, asynParamInt32, &P_Reset));
status = (asynStatus)(status | setIntegerParam(P_Reset, 0));
status = (asynStatus)(status | createParam(P_CountPresetString,
asynParamInt32, &P_CountPreset));
status = (asynStatus)(status | setIntegerParam(P_CountPreset, 0));
status =
(asynStatus)(status | createParam(P_MonitorChannelString,
asynParamInt32, &P_MonitorChannel));
status = (asynStatus)(status | setIntegerParam(P_MonitorChannel, 0));
// Create PVs templated on Channel Number
for (size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i);
setIntegerParam(P_Counts[i], 0);
status =
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
P_Counts + i));
status = (asynStatus)(status | setIntegerParam(P_Counts[i], 0));
}
if (status) {
printf("%s:%s: failed to create or setup parameters, status=%d\n",
driverName, functionName, status);
exit(1);
}
// Create Events
this->pausedEventId = epicsEventCreate(epicsEventEmpty);
this->monitorProducer = create_kafka_producer();
this->detectorProducer = create_kafka_producer();
@@ -160,6 +188,53 @@ asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
epicsInt32 value) {
int function = pasynUser->reason;
asynStatus status = asynSuccess;
const char *paramName;
const char *functionName = "writeInt32";
getParamName(function, &paramName);
// if (status) {
// epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
// "%s:%s: status=%d, function=%d, name=%s, value=%d",
// driverName, functionName, status, function, paramName,
// value);
// return status;
// }
if (function == P_CountPreset) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
epicsEventSignal(this->pausedEventId);
} else if (function == P_Reset) {
// TODO should probably set back everything to defaults
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
} else if (function == P_MonitorChannel) {
epicsInt32 currentStatus;
getIntegerParam(this->P_Status, &currentStatus);
if (!currentStatus) {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
}
} else {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
}
if (status)
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
// TODO probably I will have to split this function up, so that the system
// can process the UDP messages in parallel
void asynStreamGeneratorDriver::receiveUDP() {
asynStatus status;
int isConnected;
@@ -170,56 +245,53 @@ void asynStreamGeneratorDriver::receiveUDP() {
int eomReason;
epicsInt32 val;
epicsInt32 currentStatus;
epicsInt32 countPreset = 0;
epicsInt32 presetChannel = 1;
const uint32_t x_pixels = 128;
const uint32_t y_pixels = 128;
const char *functionName = "receiveUDP";
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
while (true) {
// memset doesn't work with epicsInt32
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
status = getIntegerParam(this->P_Status, &currentStatus);
if (!currentStatus || status) {
epicsEventWait(this->pausedEventId);
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// memset doesn't work with epicsInt32
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
}
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
callParamCallbacks();
unlock();
// Clear the input buffer, in case of stray messages
pasynOctetSyncIO->flush(pasynUDPUser);
}
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
if (status) {
if (!isConnected)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: error calling pasynManager->isConnected, "
"status=%d, error=%s\n",
driverName, "receiveUDP", status,
pasynUDPUser->errorMessage);
// driverName, functionName, status,
// pasynUserIPPort_->errorMessage);
}
asynPrint(pasynUserSelf, ASYN_TRACEIO_DRIVER,
"%s:%s: isConnected = %d\n", //
driverName, "receiveUDP", isConnected);
"%s:%s: isConnected = %d\n", driverName, functionName,
isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, buffer_size,
0, // timeout
&received, &eomReason);
// if (status)
// asynPrint(
// pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n",
// driverName, "receiveUDP", status);
// buffer[received] = 0;
if (received) {
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %f %d
// received\n",
// driverName, "receiveUDP", (double) received /
// 1500., received);
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received
// %d\n",
// driverName, "receiveUDP", received);
UDPHeader *header = (UDPHeader *)buffer;
size_t total_events = (header->BufferLength - 21) / 3;
@@ -230,23 +302,19 @@ void asynStreamGeneratorDriver::receiveUDP() {
// "%s:%s: received packet %d with %d events (%"
// PRIu64
// ")\n",
// driverName, "receiveUDP",
// driverName, functionName,
// header->BufferNumber, total_events,
// header->nanosecs());
for (size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (countPreset && counts[presetChannel] >= countPreset)
break;
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
// asynPrint(
// pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: event (%03d) on monitor %d (%" PRIu64
// ")\n", driverName, "receiveUDP", i,
// m_event->DataID, header->nanosecs() +
// (uint64_t)m_event->nanosecs());
counts[m_event->DataID + 1] += 1;
// needs to be freed!!!
@@ -264,25 +332,11 @@ void asynStreamGeneratorDriver::receiveUDP() {
auto nde = new NormalisedDetectorEvent();
nde->TimeStamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
nde->PixID =
(header->McpdID - 1) * x_pixels * y_pixels +
x_pixels * (uint32_t)d_event->XPosition +
(uint32_t)d_event->YPosition;
nde->PixID = d_event->pixelId(header->McpdID);
this->detectorQueue.push(nde);
}
}
for (size_t i = 0; i < num_channels; ++i) {
getIntegerParam(P_Counts[i], &val);
counts[i] += val;
}
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: "
// "(%d), mon3: (%d)\n",
// driverName, "receiveUDP", counts[0],
// counts[1], counts[2], counts[3], counts[4]);
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
@@ -292,7 +346,15 @@ void asynStreamGeneratorDriver::receiveUDP() {
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
"receiveUDP");
functionName);
}
if (countPreset && counts[presetChannel] >= countPreset) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
setIntegerParam(P_CountPreset, 0);
callParamCallbacks();
unlock();
}
}