Add request log queue and adapt to LogItem api

This commit is contained in:
Dominik Werder
2025-06-18 13:08:54 +02:00
parent b1c3ade769
commit f81bd3496b
8 changed files with 368 additions and 35 deletions

View File

@@ -32,6 +32,7 @@ items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" }
items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" }
parse = { path = "../daqbuf-parse", package = "daqbuf-parse" }
series = { path = "../daqbuf-series", package = "daqbuf-series" }
kanal = "0.1.1"
[dev-dependencies]
tokio = { version = "1", features = ["rt"] }

View File

@@ -96,21 +96,21 @@ where
}
},
StreamItem::Log(item) => {
match item.level {
match item.level() {
Level::TRACE => {
trace!("{:?}", item);
trace!("{}", item.display_log_file());
}
Level::DEBUG => {
debug!("{:?}", item);
debug!("{}", item.display_log_file());
}
Level::INFO => {
info!("{:?}", item);
info!("{}", item.display_log_file());
}
Level::WARN => {
warn!("{:?}", item);
warn!("{}", item.display_log_file());
}
Level::ERROR => {
error!("{:?}", item);
error!("{}", item.display_log_file());
}
}
let item = CborBytes::new(Bytes::new());
@@ -240,7 +240,7 @@ impl<S> FramedBytesToChannelEventsStream<S> {
if let Some(x) = item {
Ok(Some(Ok(x)))
} else {
let item = LogItem::from_node(0, Level::DEBUG, format!("decoded ciborium Value"));
let item = LogItem::from_node(Level::DEBUG, format!("decoded ciborium Value"));
Ok(Some(Ok(StreamItem::Log(item))))
}
}

View File

@@ -117,16 +117,22 @@ where
}
},
StreamItem::Log(item) => {
if item.level == Level::ERROR {
error!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::WARN {
warn!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::INFO {
info!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::DEBUG {
debug!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::TRACE {
trace!("node {} msg {}", item.node_ix, item.msg);
match item.level() {
Level::ERROR => {
error!("{}", item.display_log_file());
}
Level::WARN => {
warn!("{}", item.display_log_file());
}
Level::INFO => {
info!("{}", item.display_log_file());
}
Level::DEBUG => {
debug!("{}", item.display_log_file());
}
Level::TRACE => {
trace!("{}", item.display_log_file());
}
}
Ok(())
}

View File

@@ -15,6 +15,8 @@ pub mod itemclone;
pub mod json_stream;
pub mod lenframe;
pub mod lenframed;
pub mod logfilter;
pub mod logqueue;
pub mod needminbuffer;
pub mod plaineventscbor;
pub mod plaineventsjson;

29
src/logfilter.rs Normal file
View File

@@ -0,0 +1,29 @@
use futures_util::Stream;
use items_0::streamitem::AsLogItem;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct LogFilter<S> {
inp: S,
}
impl<S> fmt::Debug for LogFilter<S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("LogFilter").field("name", &"..").finish()
}
}
impl<S> Stream for LogFilter<S>
where
S: Stream,
<S as Stream>::Item: AsLogItem,
{
type Item = <S as Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
Pending
}
}

281
src/logqueue.rs Normal file
View File

@@ -0,0 +1,281 @@
use crate::log;
use async_channel::Receiver;
use async_channel::Sender;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::FromLogItem;
use items_0::streamitem::LogItem;
use std::cell::RefCell;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
autoerr::create_error_v1!(
name(PushError, "PushError"),
enum variants {
Full,
},
);
pub struct RequestLogItemBuffer {
buf: VecDeque<LogItem>,
}
impl RequestLogItemBuffer {
pub fn new() -> Self {
Self {
buf: VecDeque::new(),
}
}
pub fn push_back(&mut self, item: LogItem) -> Result<(), PushError> {
self.buf.push_back(item);
Ok(())
}
}
pub struct LogQueueFutureWrap<F> {
reqid: String,
fut: Pin<Box<F>>,
logbuf: RequestLogItemBuffer,
}
impl<F> LogQueueFutureWrap<F> {
pub fn new(reqid: String, fut: F) -> Self {
let fut = Box::pin(fut);
let logbuf = RequestLogItemBuffer::new();
Self { reqid, fut, logbuf }
}
}
impl<F> Future for LogQueueFutureWrap<F>
where
F: Future,
{
type Output = <F as Future>::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let self2 = self.as_mut().get_mut();
LOG_QUEUE_3
.try_with(|x| {
let mut h = x.try_borrow_mut().unwrap();
h.logbuf = &mut self2.logbuf as *mut _;
h.push_v0 = log_push_v0;
})
.unwrap();
let ret = self2.fut.poll_unpin(cx);
LOG_QUEUE_3
.try_with(|x| {
let mut h = x.try_borrow_mut().unwrap();
h.push_v0 = log_push_noop;
})
.unwrap();
ret
}
}
thread_local! {
static LOG_QUEUE_0: RefCell<VecDeque<u32>> = const { RefCell::new(VecDeque::new()) };
static LOG_QUEUE_1: *mut VecDeque<u32> = const { std::ptr::null_mut() };
static LOG_QUEUE_2: LogQueueFnPtrs = const { LogQueueFnPtrs::default() };
static LOG_QUEUE_3: RefCell<LogQueueFnPtrs> = const { RefCell::new(LogQueueFnPtrs::default()) };
static LOG_QUEUE_4: RefCell<LogQueueFnPtrsB> = const { RefCell::new(LogQueueFnPtrsB::default()) };
}
pub struct LogQueueFnPtrs {
logbuf: *mut RequestLogItemBuffer,
push_v0: fn(*mut RequestLogItemBuffer, LogItem) -> Result<(), PushError>,
}
impl LogQueueFnPtrs {
const fn default() -> Self {
Self {
logbuf: std::ptr::null_mut(),
push_v0: log_push_noop,
}
}
}
pub struct LogQueueFnPtrsB {
log_tx: *const LogItemMuxTx,
push_v0: fn(*const LogItemMuxTx, LogItem) -> Result<(), PushError>,
}
impl LogQueueFnPtrsB {
const fn default() -> Self {
Self {
log_tx: std::ptr::null_mut(),
push_v0: log_tx_send_noop,
}
}
}
fn push_log_item_queue_3(item: LogItem) -> Result<(), PushError> {
LOG_QUEUE_3
.try_with(|x| {
let h = x.try_borrow().unwrap();
(h.push_v0)(h.logbuf, item)
})
.unwrap()
}
fn push_log_item_queue_4(item: LogItem) -> Result<(), PushError> {
LOG_QUEUE_4
.try_with(|x| {
let h = x.try_borrow().unwrap();
(h.push_v0)(h.log_tx, item)
})
.unwrap()
}
pub fn push_log_item(item: LogItem) -> Result<(), PushError> {
push_log_item_queue_4(item)
}
fn log_push_noop(logbuf: *mut RequestLogItemBuffer, item: LogItem) -> Result<(), PushError> {
Ok(())
}
fn log_push_v0(logbuf: *mut RequestLogItemBuffer, item: LogItem) -> Result<(), PushError> {
unsafe { &mut *logbuf }.push_back(item)
}
fn log_tx_send_noop(log_tx: *const LogItemMuxTx, item: LogItem) -> Result<(), PushError> {
Ok(())
}
fn log_tx_send_real(log_tx: *const LogItemMuxTx, item: LogItem) -> Result<(), PushError> {
if let Some(tx) = unsafe { &*log_tx }.log_tx1.as_ref() {
match tx.try_send(item) {
Ok(_) => {}
Err(e) => {
eprintln!("-------------------- log_tx_send_real Err {e}");
}
}
} else {
eprintln!("-------------------- log_tx_send_real NO TX");
}
Ok(())
}
struct LogItemMuxTx {
reqid: String,
log_tx1: Option<Pin<Box<Sender<LogItem>>>>,
log_tx2: Option<kanal::AsyncSender<LogItem>>,
}
pub struct LogItemMux<S> {
tx: UnsafeCell<LogItemMuxTx>,
inp: Option<S>,
log_rx1: Option<Pin<Box<Receiver<LogItem>>>>,
log_rx2: Option<kanal::AsyncReceiver<LogItem>>,
}
impl<S> LogItemMux<S> {
pub fn new(inp: S, reqid: String) -> Self {
let (log_tx1, log_rx1) = async_channel::bounded(4000);
let (log_tx2, log_rx2) = kanal::bounded_async(4000);
Self {
tx: UnsafeCell::new(LogItemMuxTx {
reqid,
log_tx1: Some(Box::pin(log_tx1)),
log_tx2: Some(log_tx2),
}),
inp: Some(inp),
log_rx1: Some(Box::pin(log_rx1)),
log_rx2: Some(log_rx2),
}
}
}
impl<S> Stream for LogItemMux<S>
where
S: Stream + Unpin,
<S as Stream>::Item: FromLogItem,
{
type Item = <S as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let self2 = self.as_mut().get_mut();
LOG_QUEUE_4
.try_with(|x| {
let mut h = x.try_borrow_mut().unwrap();
h.log_tx = self2.tx.get_mut();
h.push_v0 = log_tx_send_real;
})
.unwrap();
let ret = loop {
let mut have_progress = false;
let mut have_pending = false;
if let Some(s) = self2.log_rx1.as_mut() {
match s.poll_next_unpin(cx) {
Ready(Some(x)) => {
if true {
let item = <S as Stream>::Item::from_log_item(x);
break Ready(Some(item));
} else {
log::info!("LogItemMux got {x:?}");
have_progress = true;
}
}
Ready(None) => {
self2.log_rx1 = None;
}
Pending => {
have_pending = true;
}
}
}
if let Some(s) = self2.inp.as_mut() {
match s.poll_next_unpin(cx) {
Ready(Some(x)) => break Ready(Some(x)),
Ready(None) => {
self2.inp = None;
if let Some(rx) = self2.log_rx1.as_ref() {
if rx.len() == 0 {
self2.log_rx1 = None;
continue;
} else {
continue;
}
} else {
}
}
Pending => {
have_pending = true;
}
}
}
break if have_progress {
continue;
} else if have_pending {
Pending
} else if self2.inp.is_none() {
if let Some(rx) = self2.log_rx1.as_ref() {
if rx.len() == 0 {
self2.log_rx1 = None;
continue;
} else {
continue;
}
} else {
Ready(None)
}
} else {
panic!("no progress no pending")
};
};
LOG_QUEUE_4
.try_with(|x| {
let mut h = x.try_borrow_mut().unwrap();
h.push_v0 = log_tx_send_noop;
})
.unwrap();
ret
}
}

View File

@@ -85,16 +85,16 @@ impl TimeBinnedFromLayers {
outbuf: VecDeque::new(),
};
info_init!("pbd_enable");
let item = LogItem::from_node(0, log::Level::TRACE, "test-log-item-trace".into());
let item = LogItem::from_node(log::Level::TRACE, "test-log-item-trace".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::DEBUG, "test-log-item-debug".into());
let item = LogItem::from_node(log::Level::DEBUG, "test-log-item-debug".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::INFO, "test-log-item-info".into());
let item = LogItem::from_node(log::Level::INFO, "test-log-item-info".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = LogItem::from_node(0, log::Level::WARN, "test-log-item-warn".into());
let item = LogItem::from_node(log::Level::WARN, "test-log-item-warn".into());
let item = StreamItem::Log(item);
ret.outbuf.push_back(Ok(item));
let item = StatsItem::Binning;

View File

@@ -17,6 +17,7 @@ use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::collect_s::CollectableDyn;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -403,6 +404,7 @@ fn take_collector_result_cbor(
pub fn timeoutable_collectable_stream_to_json_bytes(
stream: Pin<Box<dyn Stream<Item = Option<Option<Sitemty<Box<dyn CollectableDyn>>>>> + Send>>,
timeout_content_2: Duration,
emit_log_items: bool,
) -> Pin<Box<dyn Stream<Item = Result<JsonBytes, crate::json_stream::Error>> + Send>> {
let mut coll = None;
let mut last_emit = Instant::now();
@@ -426,20 +428,32 @@ pub fn timeoutable_collectable_stream_to_json_bytes(
RangeCompletableItem::RangeComplete => None,
},
StreamItem::Log(x) => {
if x.level == Level::ERROR {
error!("{}", x.msg);
} else if x.level == Level::WARN {
warn!("{}", x.msg);
} else if x.level == Level::INFO {
info!("{}", x.msg);
} else if x.level == Level::DEBUG {
debug!("{}", x.msg);
} else if x.level == Level::TRACE {
trace!("{}", x.msg);
if emit_log_items {
let obj = serde_json::json!({"type": "log", "obj": x});
match serde_json::to_string(&obj) {
Ok(json) => Some(Ok(JsonBytes::new(json))),
Err(e) => Some(Err(sitem_err2_from_string(e))),
}
} else {
trace!("{}", x.msg);
match x.level() {
Level::ERROR => {
error!("{}", x.display_log_file());
}
Level::WARN => {
warn!("{}", x.display_log_file());
}
Level::INFO => {
info!("{}", x.display_log_file());
}
Level::DEBUG => {
debug!("{}", x.display_log_file());
}
Level::TRACE => {
trace!("{}", x.display_log_file());
}
}
None
}
None
}
StreamItem::Stats(x) => {
debug!("{x:?}");
@@ -518,7 +532,7 @@ pub async fn timebinned_json_framed(
.chain(futures_util::stream::iter([None]));
let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream);
let stream = Box::pin(stream);
let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2);
let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2, false);
Ok(stream)
}
@@ -587,7 +601,7 @@ pub async fn timebinned_cbor_framed(
RangeCompletableItem::RangeComplete => None,
},
StreamItem::Log(x) => {
if x.level <= log_items_level {
if x.level() <= log_items_level {
let mut buf = Vec::with_capacity(1024);
ciborium::into_writer(&x, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);