diff --git a/scripts/st.cmd b/scripts/st.cmd index 80ea698..5d8db12 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -15,5 +15,6 @@ dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1") dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2") dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4") iocInit() diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py index 428e2ad..fc51e17 100644 --- a/scripts/udp_gen.py +++ b/scripts/udp_gen.py @@ -54,19 +54,50 @@ while True: tosend = list(header) - for i in range(num_events): - d = list(data) + # I believe, that in our case we never mix monitor and detector events as + # the monitors should have id 0 and the detector events 1-9 so I have + # excluded that posibility here. That would, however, if true mean we could + # reduce also the number of checks on the parsing side of things... - # set monitor - d[5] = (1 << 7) | random.randint(0,3) + is_monitor = random.randint(0, 9) - # update trigger timestamp - event_timestamp = (time.time_ns() // 100) - base_timestamp - d[0] = event_timestamp & 0xff - d[1] = (event_timestamp >> 8) & 0xff - d[2] = (event_timestamp >> 16) & 0x07 + if is_monitor > 3: - tosend += d + for i in range(num_events): + d = list(data) + + monitor = random.randint(0,3) + + d[5] = (1 << 7) | monitor + + # update trigger timestamp + event_timestamp = (time.time_ns() // 100) - base_timestamp + d[0] = event_timestamp & 0xff + d[1] = (event_timestamp >> 8) & 0xff + d[2] = (event_timestamp >> 16) & 0x07 + + tosend += d + + else: + + for i in range(num_events): + d = list(data) + + amplitude = random.randint(0, 255) + x_pos = random.randint(0, 1023) + y_pos = random.randint(0, 1023) + event_timestamp = (time.time_ns() // 100) - base_timestamp + + d[5] = (0 << 7) | (amplitude >> 1) + d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3) + d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5) + d[2] = ((x_pos << 3) & 0xF8) + + d[0] = event_timestamp & 0xff + d[1] = (event_timestamp >> 8) & 0xff + d[2] |= (event_timestamp >> 16) & 0x07 + + tosend += d sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) mv = memoryview(bytes(header)).cast('H') diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index ecf0553..e8838cf 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -35,6 +35,11 @@ static void monitorProducerTask(void *drvPvt) { pSGD->produceMonitor(); } +static void detectorProducerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->produceDetector(); +} + // UDP Packet Definitions struct __attribute__((__packed__)) UDPHeader { uint16_t BufferLength; @@ -93,14 +98,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ - num_channels(numChannels), monitorQueue(1000, false) { + num_channels(numChannels + 1), monitorQueue(1000, false), + detectorQueue(1000, false) { // Parameter Setup char pv_name_buffer[100]; - P_Counts = new int[numChannels]; + P_Counts = new int[this->num_channels]; asynStatus status; - for (size_t i = 0; i < numChannels; ++i) { + for (size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); status = createParam(pv_name_buffer, asynParamInt32, P_Counts + i); @@ -115,7 +121,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, set_config(conf, "bootstrap.servers", "linkafka01:9092"); set_config(conf, "queue.buffering.max.messages", "1e7"); - // Create the Producer instance. + // Create the Monitor Producer instance. this->monitorProducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!this->monitorProducer) { @@ -124,6 +130,19 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, exit(1); } + conf = rd_kafka_conf_new(); + set_config(conf, "bootstrap.servers", "linkafka01:9092"); + set_config(conf, "queue.buffering.max.messages", "1e7"); + + // Create the Detector Producer instance. + this->detectorProducer = + rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!this->detectorProducer) { + // TODO + // g_error("Failed to create new producer: %s", errstr); + exit(1); + } + // Setup for Thread Producing Monitor Kafka Events status = (asynStatus)(epicsThreadCreate( @@ -138,6 +157,20 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, return; } + // Setup for Thread Producing Detector Kafka Events + status = (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::detectorProducerTask, + this) == NULL); + if (status) { + // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + // functionName, status); + printf("%s:%s: epicsThreadCreate failure, status=%d\n", + "StreamGenerator", "init", status); + return; + } + // UDP Receive Setup pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); @@ -154,143 +187,51 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, return; } - // // Kafka Produce Setup - // rd_kafka_conf_t *conf; - // char errstr[512]; - - // // Create client configuration - // conf = rd_kafka_conf_new(); - // set_config(conf, "bootstrap.servers", "linkafka01:9092"); - // set_config(conf, "queue.buffering.max.messages", "1e7"); - - // // Create the Producer instance. - // rd_kafka_t *producer = - // rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - // if (!producer) { - // // TODO - // // g_error("Failed to create new producer: %s", errstr); - // exit(1); - // } - - // char *msg = "asdf\n"; - // // EventMessageBuilder b; - // // We could I believe reuse a buffer which might be more performant. - // flatbuffers::FlatBufferBuilder builder(1024); - // // clear with build.Clear(); - // std::vector tof = {1, 2, 3}; - // std::vector did = {0, 0, 0}; - // auto message = - // CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did); - - // builder.Finish(message, "ev42"); - // printf("buffer size: %d\n", builder.GetSize()); - - // rd_kafka_resp_err_t err = rd_kafka_producev( - // producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), - // RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - // // RD_KAFKA_V_KEY((void *)key, key_len), - // RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), - // builder.GetSize()), - // // RD_KAFKA_V_OPAQUE(NULL), - // RD_KAFKA_V_END); - - // if (err) { - // // TODO - // // g_error("Failed to produce to topic %s: %s", topic, - // // rd_kafka_err2str(err)); - // exit(1); - // } - - // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); - + // TODO add exit should perhaps ensure the queue is flushed // rd_kafka_poll(producer, 0); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); // rd_kafka_flush(producer, 10 * 1000); // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); } -asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { delete[] P_Counts; } - -// TODO pretty sure I don't actually need to overwrite this -asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, - epicsInt32 *value) { - // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser, - // epicsInt64 *value) { - - const char *paramName; - int function = pasynUser->reason; - asynStatus status; - - // TODO not freed - getParamName(function, ¶mName); - - bool is_p_counts = false; - for (size_t i = 0; i < num_channels; ++i) { - is_p_counts = is_p_counts | function == P_Counts[i]; - } - - if (is_p_counts) { - status = getIntegerParam(function, value); - - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s %d\n", - "StreamGenerator", "readInt64", function, paramName, status); - // return status; - return asynSuccess; - } else { - return asynError; - } - return asynSuccess; +asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { + // should make sure queues are empty and freed + // and that the kafka producers are flushed and freed + delete[] P_Counts; } -void asynStreamGeneratorDriver::produceMonitor() { - - flatbuffers::FlatBufferBuilder builder(1024); - - while (true) { - - if (!this->monitorQueue.isEmpty()) { - - builder.Clear(); - auto nme = this->monitorQueue.pop(); - - std::vector tof = {nme->TimeStamp}; - std::vector did = {nme->DataID}; - - auto message = - CreateEventMessageDirect(builder, "monitor", 0, 0, &tof, &did); - - builder.Finish(message, "ev42"); - // printf("buffer size: %d\n", builder.GetSize()); - - rd_kafka_resp_err_t err = rd_kafka_producev( - monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - // RD_KAFKA_V_KEY((void *)key, key_len), - RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), - builder.GetSize()), - // RD_KAFKA_V_OPAQUE(NULL), - RD_KAFKA_V_END); - - if (err) { - // TODO - // g_error("Failed to produce to topic %s: %s", topic, - // rd_kafka_err2str(err)); - } - - // epicsStdoutPrintf("Kafka Queue Size %d\n", - // rd_kafka_outq_len(monitorProducer)); - - rd_kafka_poll(monitorProducer, 0); - - printf("Monitor Events Queued %d\n", this->monitorQueue.getHighWaterMark()); - this->monitorQueue.resetHighWaterMark(); - - delete nme; - } - - epicsThreadSleep(0.001); // seconds - } -} +// // TODO pretty sure I don't actually need to overwrite this +// asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, +// epicsInt32 *value) { +// // asynStatus asynStreamGeneratorDriver::readInt64(asynUser *pasynUser, +// // epicsInt64 *value) { +// +// const char *paramName; +// int function = pasynUser->reason; +// asynStatus status; +// +// // TODO not freed +// getParamName(function, ¶mName); +// +// bool is_p_counts = false; +// for (size_t i = 0; i < num_channels; ++i) { +// is_p_counts = is_p_counts | function == P_Counts[i]; +// } +// +// if (is_p_counts) { +// status = getIntegerParam(function, value); +// +// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: function %d %s +// %d\n", +// "StreamGenerator", "readInt64", function, paramName, +// status); +// // return status; +// return asynSuccess; +// } else { +// return asynError; +// } +// return asynSuccess; +// } void asynStreamGeneratorDriver::receiveUDP() { asynStatus status; @@ -302,14 +243,17 @@ void asynStreamGeneratorDriver::receiveUDP() { epicsInt32 val; + const uint32_t x_pixels = 128; + const uint32_t y_pixels = 128; + // TODO epics doesn't seem to support uint64, you would need an array of // uint32. It does support int64 though.. so we start with that - epicsInt32 *monitor_counts = new epicsInt32[this->num_channels]; + epicsInt32 *counts = new epicsInt32[this->num_channels]; while (true) { // memset doesn't work with epicsInt32 for (size_t i = 0; i < this->num_channels; ++i) { - monitor_counts[i] = 0; + counts[i] = 0; } // epicsStdoutPrintf("polling!!"); @@ -368,9 +312,9 @@ void asynStreamGeneratorDriver::receiveUDP() { // m_event->DataID, header->nanosecs() + // (uint64_t)m_event->nanosecs()); - monitor_counts[m_event->DataID] += 1; + counts[m_event->DataID + 1] += 1; - // TODO needs to be freed + // needs to be freed!!! auto nme = new NormalisedMonitorEvent(); nme->TimeStamp = header->nanosecs() + (uint64_t)m_event->nanosecs(); @@ -379,24 +323,34 @@ void asynStreamGeneratorDriver::receiveUDP() { } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; + counts[0] += 1; + + // needs to be freed!!! + auto nde = new NormalisedDetectorEvent(); + nde->TimeStamp = + header->nanosecs() + (uint64_t)d_event->nanosecs(); + nde->PixID = + (header->McpdID - 1) * x_pixels * y_pixels + + x_pixels * (uint32_t)d_event->XPosition + + (uint32_t)d_event->YPosition; + this->detectorQueue.push(nde); } } for (size_t i = 0; i < num_channels; ++i) { getIntegerParam(P_Counts[i], &val); - monitor_counts[i] += val; + counts[i] += val; } asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: monitor 0: (%d), monitor 1: (%d), monitor 2: " - "(%d), monitor 3: (%d)\n", - "StreamGenerator", "receiveUDP", monitor_counts[0], - monitor_counts[1], monitor_counts[2], - monitor_counts[3]); + "%s:%s: det: (%d), mon0: (%d), mon1: (%d), mon2: " + "(%d), mon3: (%d)\n", + "StreamGenerator", "receiveUDP", counts[0], counts[1], + counts[2], counts[3], counts[4]); lock(); for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], monitor_counts[i]); + setIntegerParam(P_Counts[i], counts[i]); } callParamCallbacks(); unlock(); @@ -411,6 +365,161 @@ void asynStreamGeneratorDriver::receiveUDP() { } } +void asynStreamGeneratorDriver::produceMonitor() { + + flatbuffers::FlatBufferBuilder builder(1024); + + std::vector tof; + tof.reserve(9000); + + std::vector did; + did.reserve(9000); + + int total = 0; + epicsTimeStamp last_sent = epicsTime::getCurrent(); + + uint64_t message_id = 0; + + while (true) { + + if (!this->monitorQueue.isEmpty()) { + + ++total; + auto nme = this->monitorQueue.pop(); + tof.push_back(nme->TimeStamp); + did.push_back(nme->DataID); + delete nme; + + } else { + epicsThreadSleep(0.001); // seconds + } + + epicsTimeStamp now = epicsTime::getCurrent(); + + // At least every 0.2 seconds + if (total >= 8192 || + epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { + last_sent = epicsTime::getCurrent(); + + if (total) { + total = 0; + + builder.Clear(); + + auto message = CreateEventMessageDirect( + builder, "monitor", message_id++, 0, &tof, &did); + + builder.Finish(message, "ev42"); + // printf("buffer size: %d\n", builder.GetSize()); + + rd_kafka_resp_err_t err = rd_kafka_producev( + monitorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // RD_KAFKA_V_KEY((void *)key, key_len), + RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), + builder.GetSize()), + // RD_KAFKA_V_OPAQUE(NULL), + RD_KAFKA_V_END); + + if (err) { + // TODO + // g_error("Failed to produce to topic %s: %s", topic, + // rd_kafka_err2str(err)); + } + + // epicsStdoutPrintf("Kafka Queue Size %d\n", + // rd_kafka_outq_len(monitorProducer)); + + rd_kafka_poll(monitorProducer, 0); + + printf("Monitor Events Queued before sending %d\n", + this->monitorQueue.getHighWaterMark()); + this->monitorQueue.resetHighWaterMark(); + + tof.clear(); + did.clear(); + } + } + } +} + +void asynStreamGeneratorDriver::produceDetector() { + + flatbuffers::FlatBufferBuilder builder(1024); + + std::vector tof; + tof.reserve(9000); + + std::vector did; + did.reserve(9000); + + int total = 0; + epicsTimeStamp last_sent = epicsTime::getCurrent(); + + uint64_t message_id = 0; + + while (true) { + + if (!this->detectorQueue.isEmpty()) { + + ++total; + auto nde = this->detectorQueue.pop(); + tof.push_back(nde->TimeStamp); + did.push_back(nde->PixID); + delete nde; + } else { + epicsThreadSleep(0.001); // seconds + } + + epicsTimeStamp now = epicsTime::getCurrent(); + + // At least every 0.2 seconds + if (total >= 8192 || + epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { + last_sent = epicsTime::getCurrent(); + + if (total) { + total = 0; + + builder.Clear(); + + auto message = CreateEventMessageDirect( + builder, "detector", message_id++, 0, &tof, &did); + + builder.Finish(message, "ev42"); + // printf("buffer size: %d\n", builder.GetSize()); + + rd_kafka_resp_err_t err = rd_kafka_producev( + detectorProducer, RD_KAFKA_V_TOPIC("NEWEFU_TEST2"), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // RD_KAFKA_V_KEY((void *)key, key_len), + RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), + builder.GetSize()), + // RD_KAFKA_V_OPAQUE(NULL), + RD_KAFKA_V_END); + + if (err) { + // TODO + // g_error("Failed to produce to topic %s: %s", topic, + // rd_kafka_err2str(err)); + } + + // epicsStdoutPrintf("Kafka Queue Size %d\n", + // rd_kafka_outq_len(monitorProducer)); + + rd_kafka_poll(detectorProducer, 0); + + printf("Detector Events Queued before sending %d\n", + this->detectorQueue.getHighWaterMark()); + this->detectorQueue.resetHighWaterMark(); + + tof.clear(); + did.clear(); + } + } + } +} + /* Configuration routine. Called directly, or from the iocsh function below */ extern "C" { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index c1f00cd..6ee1c6b 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -7,7 +7,12 @@ struct __attribute__((__packed__)) NormalisedMonitorEvent { uint64_t TimeStamp; - uint32_t DataID : 4; + uint8_t DataID : 4; +}; + +struct __attribute__((__packed__)) NormalisedDetectorEvent { + uint64_t TimeStamp; + uint32_t PixID; }; /* These are the drvInfo strings that are used to identify the parameters. */ @@ -20,10 +25,11 @@ class asynStreamGeneratorDriver : public asynPortDriver { virtual ~asynStreamGeneratorDriver(); // virtual asynStatus readInt64(asynUser *pasynUser, epicsInt64 *value); - virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); + // virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); void receiveUDP(); void produceMonitor(); + void produceDetector(); protected: int *P_Counts; @@ -34,6 +40,9 @@ class asynStreamGeneratorDriver : public asynPortDriver { epicsRingPointer monitorQueue; rd_kafka_t *monitorProducer; + + epicsRingPointer detectorQueue; + rd_kafka_t *detectorProducer; }; #endif