From f81bd3496bc123d1e1064afe5910d67f0b826bc7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 18 Jun 2025 13:08:54 +0200 Subject: [PATCH] Add request log queue and adapt to LogItem api --- Cargo.toml | 1 + src/cbor_stream.rs | 14 +- src/collect.rs | 26 ++-- src/lib.rs | 2 + src/logfilter.rs | 29 ++++ src/logqueue.rs | 281 ++++++++++++++++++++++++++++++++++++++ src/timebin/fromlayers.rs | 8 +- src/timebinnedjson.rs | 42 ++++-- 8 files changed, 368 insertions(+), 35 deletions(-) create mode 100644 src/logfilter.rs create mode 100644 src/logqueue.rs diff --git a/Cargo.toml b/Cargo.toml index 9141b56..0c688a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../daqbuf-items-2", package = "daqbuf-items-2" } parse = { path = "../daqbuf-parse", package = "daqbuf-parse" } series = { path = "../daqbuf-series", package = "daqbuf-series" } +kanal = "0.1.1" [dev-dependencies] tokio = { version = "1", features = ["rt"] } diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 1bc5f02..4d92409 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -96,21 +96,21 @@ where } }, StreamItem::Log(item) => { - match item.level { + match item.level() { Level::TRACE => { - trace!("{:?}", item); + trace!("{}", item.display_log_file()); } Level::DEBUG => { - debug!("{:?}", item); + debug!("{}", item.display_log_file()); } Level::INFO => { - info!("{:?}", item); + info!("{}", item.display_log_file()); } Level::WARN => { - warn!("{:?}", item); + warn!("{}", item.display_log_file()); } Level::ERROR => { - error!("{:?}", item); + error!("{}", item.display_log_file()); } } let item = CborBytes::new(Bytes::new()); @@ -240,7 +240,7 @@ impl FramedBytesToChannelEventsStream { if let Some(x) = item { Ok(Some(Ok(x))) } else { - let item = LogItem::from_node(0, Level::DEBUG, format!("decoded ciborium Value")); + let item = LogItem::from_node(Level::DEBUG, format!("decoded ciborium Value")); Ok(Some(Ok(StreamItem::Log(item)))) } } diff --git a/src/collect.rs b/src/collect.rs index b5786bc..895acf9 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -117,16 +117,22 @@ where } }, StreamItem::Log(item) => { - if item.level == Level::ERROR { - error!("node {} msg {}", item.node_ix, item.msg); - } else if item.level == Level::WARN { - warn!("node {} msg {}", item.node_ix, item.msg); - } else if item.level == Level::INFO { - info!("node {} msg {}", item.node_ix, item.msg); - } else if item.level == Level::DEBUG { - debug!("node {} msg {}", item.node_ix, item.msg); - } else if item.level == Level::TRACE { - trace!("node {} msg {}", item.node_ix, item.msg); + match item.level() { + Level::ERROR => { + error!("{}", item.display_log_file()); + } + Level::WARN => { + warn!("{}", item.display_log_file()); + } + Level::INFO => { + info!("{}", item.display_log_file()); + } + Level::DEBUG => { + debug!("{}", item.display_log_file()); + } + Level::TRACE => { + trace!("{}", item.display_log_file()); + } } Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index c2b452e..ebb1bf2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,8 @@ pub mod itemclone; pub mod json_stream; pub mod lenframe; pub mod lenframed; +pub mod logfilter; +pub mod logqueue; pub mod needminbuffer; pub mod plaineventscbor; pub mod plaineventsjson; diff --git a/src/logfilter.rs b/src/logfilter.rs new file mode 100644 index 0000000..81a3156 --- /dev/null +++ b/src/logfilter.rs @@ -0,0 +1,29 @@ +use futures_util::Stream; +use items_0::streamitem::AsLogItem; +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct LogFilter { + inp: S, +} + +impl fmt::Debug for LogFilter { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("LogFilter").field("name", &"..").finish() + } +} + +impl Stream for LogFilter +where + S: Stream, + ::Item: AsLogItem, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + Pending + } +} diff --git a/src/logqueue.rs b/src/logqueue.rs new file mode 100644 index 0000000..6cb6786 --- /dev/null +++ b/src/logqueue.rs @@ -0,0 +1,281 @@ +use crate::log; +use async_channel::Receiver; +use async_channel::Sender; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::FromLogItem; +use items_0::streamitem::LogItem; +use std::cell::RefCell; +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(PushError, "PushError"), + enum variants { + Full, + }, +); + +pub struct RequestLogItemBuffer { + buf: VecDeque, +} + +impl RequestLogItemBuffer { + pub fn new() -> Self { + Self { + buf: VecDeque::new(), + } + } + + pub fn push_back(&mut self, item: LogItem) -> Result<(), PushError> { + self.buf.push_back(item); + Ok(()) + } +} + +pub struct LogQueueFutureWrap { + reqid: String, + fut: Pin>, + logbuf: RequestLogItemBuffer, +} + +impl LogQueueFutureWrap { + pub fn new(reqid: String, fut: F) -> Self { + let fut = Box::pin(fut); + let logbuf = RequestLogItemBuffer::new(); + Self { reqid, fut, logbuf } + } +} + +impl Future for LogQueueFutureWrap +where + F: Future, +{ + type Output = ::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let self2 = self.as_mut().get_mut(); + LOG_QUEUE_3 + .try_with(|x| { + let mut h = x.try_borrow_mut().unwrap(); + h.logbuf = &mut self2.logbuf as *mut _; + h.push_v0 = log_push_v0; + }) + .unwrap(); + let ret = self2.fut.poll_unpin(cx); + LOG_QUEUE_3 + .try_with(|x| { + let mut h = x.try_borrow_mut().unwrap(); + h.push_v0 = log_push_noop; + }) + .unwrap(); + ret + } +} + +thread_local! { + static LOG_QUEUE_0: RefCell> = const { RefCell::new(VecDeque::new()) }; + static LOG_QUEUE_1: *mut VecDeque = const { std::ptr::null_mut() }; + static LOG_QUEUE_2: LogQueueFnPtrs = const { LogQueueFnPtrs::default() }; + static LOG_QUEUE_3: RefCell = const { RefCell::new(LogQueueFnPtrs::default()) }; + static LOG_QUEUE_4: RefCell = const { RefCell::new(LogQueueFnPtrsB::default()) }; +} + +pub struct LogQueueFnPtrs { + logbuf: *mut RequestLogItemBuffer, + push_v0: fn(*mut RequestLogItemBuffer, LogItem) -> Result<(), PushError>, +} + +impl LogQueueFnPtrs { + const fn default() -> Self { + Self { + logbuf: std::ptr::null_mut(), + push_v0: log_push_noop, + } + } +} + +pub struct LogQueueFnPtrsB { + log_tx: *const LogItemMuxTx, + push_v0: fn(*const LogItemMuxTx, LogItem) -> Result<(), PushError>, +} + +impl LogQueueFnPtrsB { + const fn default() -> Self { + Self { + log_tx: std::ptr::null_mut(), + push_v0: log_tx_send_noop, + } + } +} + +fn push_log_item_queue_3(item: LogItem) -> Result<(), PushError> { + LOG_QUEUE_3 + .try_with(|x| { + let h = x.try_borrow().unwrap(); + (h.push_v0)(h.logbuf, item) + }) + .unwrap() +} + +fn push_log_item_queue_4(item: LogItem) -> Result<(), PushError> { + LOG_QUEUE_4 + .try_with(|x| { + let h = x.try_borrow().unwrap(); + (h.push_v0)(h.log_tx, item) + }) + .unwrap() +} + +pub fn push_log_item(item: LogItem) -> Result<(), PushError> { + push_log_item_queue_4(item) +} + +fn log_push_noop(logbuf: *mut RequestLogItemBuffer, item: LogItem) -> Result<(), PushError> { + Ok(()) +} + +fn log_push_v0(logbuf: *mut RequestLogItemBuffer, item: LogItem) -> Result<(), PushError> { + unsafe { &mut *logbuf }.push_back(item) +} + +fn log_tx_send_noop(log_tx: *const LogItemMuxTx, item: LogItem) -> Result<(), PushError> { + Ok(()) +} + +fn log_tx_send_real(log_tx: *const LogItemMuxTx, item: LogItem) -> Result<(), PushError> { + if let Some(tx) = unsafe { &*log_tx }.log_tx1.as_ref() { + match tx.try_send(item) { + Ok(_) => {} + Err(e) => { + eprintln!("-------------------- log_tx_send_real Err {e}"); + } + } + } else { + eprintln!("-------------------- log_tx_send_real NO TX"); + } + Ok(()) +} + +struct LogItemMuxTx { + reqid: String, + log_tx1: Option>>>, + log_tx2: Option>, +} + +pub struct LogItemMux { + tx: UnsafeCell, + inp: Option, + log_rx1: Option>>>, + log_rx2: Option>, +} + +impl LogItemMux { + pub fn new(inp: S, reqid: String) -> Self { + let (log_tx1, log_rx1) = async_channel::bounded(4000); + let (log_tx2, log_rx2) = kanal::bounded_async(4000); + Self { + tx: UnsafeCell::new(LogItemMuxTx { + reqid, + log_tx1: Some(Box::pin(log_tx1)), + log_tx2: Some(log_tx2), + }), + inp: Some(inp), + log_rx1: Some(Box::pin(log_rx1)), + log_rx2: Some(log_rx2), + } + } +} + +impl Stream for LogItemMux +where + S: Stream + Unpin, + ::Item: FromLogItem, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let self2 = self.as_mut().get_mut(); + LOG_QUEUE_4 + .try_with(|x| { + let mut h = x.try_borrow_mut().unwrap(); + h.log_tx = self2.tx.get_mut(); + h.push_v0 = log_tx_send_real; + }) + .unwrap(); + let ret = loop { + let mut have_progress = false; + let mut have_pending = false; + if let Some(s) = self2.log_rx1.as_mut() { + match s.poll_next_unpin(cx) { + Ready(Some(x)) => { + if true { + let item = ::Item::from_log_item(x); + break Ready(Some(item)); + } else { + log::info!("LogItemMux got {x:?}"); + have_progress = true; + } + } + Ready(None) => { + self2.log_rx1 = None; + } + Pending => { + have_pending = true; + } + } + } + if let Some(s) = self2.inp.as_mut() { + match s.poll_next_unpin(cx) { + Ready(Some(x)) => break Ready(Some(x)), + Ready(None) => { + self2.inp = None; + if let Some(rx) = self2.log_rx1.as_ref() { + if rx.len() == 0 { + self2.log_rx1 = None; + continue; + } else { + continue; + } + } else { + } + } + Pending => { + have_pending = true; + } + } + } + break if have_progress { + continue; + } else if have_pending { + Pending + } else if self2.inp.is_none() { + if let Some(rx) = self2.log_rx1.as_ref() { + if rx.len() == 0 { + self2.log_rx1 = None; + continue; + } else { + continue; + } + } else { + Ready(None) + } + } else { + panic!("no progress no pending") + }; + }; + LOG_QUEUE_4 + .try_with(|x| { + let mut h = x.try_borrow_mut().unwrap(); + h.push_v0 = log_tx_send_noop; + }) + .unwrap(); + ret + } +} diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index 574bb5c..4e470e9 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -85,16 +85,16 @@ impl TimeBinnedFromLayers { outbuf: VecDeque::new(), }; info_init!("pbd_enable"); - let item = LogItem::from_node(0, log::Level::TRACE, "test-log-item-trace".into()); + let item = LogItem::from_node(log::Level::TRACE, "test-log-item-trace".into()); let item = StreamItem::Log(item); ret.outbuf.push_back(Ok(item)); - let item = LogItem::from_node(0, log::Level::DEBUG, "test-log-item-debug".into()); + let item = LogItem::from_node(log::Level::DEBUG, "test-log-item-debug".into()); let item = StreamItem::Log(item); ret.outbuf.push_back(Ok(item)); - let item = LogItem::from_node(0, log::Level::INFO, "test-log-item-info".into()); + let item = LogItem::from_node(log::Level::INFO, "test-log-item-info".into()); let item = StreamItem::Log(item); ret.outbuf.push_back(Ok(item)); - let item = LogItem::from_node(0, log::Level::WARN, "test-log-item-warn".into()); + let item = LogItem::from_node(log::Level::WARN, "test-log-item-warn".into()); let item = StreamItem::Log(item); ret.outbuf.push_back(Ok(item)); let item = StatsItem::Binning; diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 4a3c8f3..9d0e748 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -17,6 +17,7 @@ use futures_util::StreamExt; use futures_util::TryStreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -403,6 +404,7 @@ fn take_collector_result_cbor( pub fn timeoutable_collectable_stream_to_json_bytes( stream: Pin>>>> + Send>>, timeout_content_2: Duration, + emit_log_items: bool, ) -> Pin> + Send>> { let mut coll = None; let mut last_emit = Instant::now(); @@ -426,20 +428,32 @@ pub fn timeoutable_collectable_stream_to_json_bytes( RangeCompletableItem::RangeComplete => None, }, StreamItem::Log(x) => { - if x.level == Level::ERROR { - error!("{}", x.msg); - } else if x.level == Level::WARN { - warn!("{}", x.msg); - } else if x.level == Level::INFO { - info!("{}", x.msg); - } else if x.level == Level::DEBUG { - debug!("{}", x.msg); - } else if x.level == Level::TRACE { - trace!("{}", x.msg); + if emit_log_items { + let obj = serde_json::json!({"type": "log", "obj": x}); + match serde_json::to_string(&obj) { + Ok(json) => Some(Ok(JsonBytes::new(json))), + Err(e) => Some(Err(sitem_err2_from_string(e))), + } } else { - trace!("{}", x.msg); + match x.level() { + Level::ERROR => { + error!("{}", x.display_log_file()); + } + Level::WARN => { + warn!("{}", x.display_log_file()); + } + Level::INFO => { + info!("{}", x.display_log_file()); + } + Level::DEBUG => { + debug!("{}", x.display_log_file()); + } + Level::TRACE => { + trace!("{}", x.display_log_file()); + } + } + None } - None } StreamItem::Stats(x) => { debug!("{x:?}"); @@ -518,7 +532,7 @@ pub async fn timebinned_json_framed( .chain(futures_util::stream::iter([None])); let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); let stream = Box::pin(stream); - let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2); + let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2, false); Ok(stream) } @@ -587,7 +601,7 @@ pub async fn timebinned_cbor_framed( RangeCompletableItem::RangeComplete => None, }, StreamItem::Log(x) => { - if x.level <= log_items_level { + if x.level() <= log_items_level { let mut buf = Vec::with_capacity(1024); ciborium::into_writer(&x, &mut buf).expect("cbor serialize"); let bytes = Bytes::from(buf);