diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index 2783dd3..b0a5003 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -344,7 +344,7 @@ impl EventChunker { self.dbg_path ); warn!("{}", msg); - let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + let item = LogItem::from_node(Level::INFO, msg); self.log_items.push_back(item); } if self.seen_before_range_count > 100 { @@ -442,7 +442,7 @@ impl EventChunker { scalar_type, self.fetch_info.scalar_type(), ); - let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); log_items.push(item); } if false { @@ -452,7 +452,7 @@ impl EventChunker { discard = true; self.discard_count_shape += 1; let msg = format!("shape mismatch {:?} {:?}", shape_this, self.fetch_info.shape(),); - let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); log_items.push(item); } } @@ -486,7 +486,7 @@ impl EventChunker { self.fetch_info.shape(), sh, ); - let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); log_items.push(item); } } @@ -499,7 +499,7 @@ impl EventChunker { self.fetch_info.scalar_type(), self.fetch_info.shape(), ); - let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); log_items.push(item); } } diff --git a/crates/disk/src/eventchunkermultifile.rs b/crates/disk/src/eventchunkermultifile.rs index 8aca1e2..46edbc9 100644 --- a/crates/disk/src/eventchunkermultifile.rs +++ b/crates/disk/src/eventchunkermultifile.rs @@ -136,11 +136,7 @@ impl Stream for EventChunkerMultifile { if min <= self.max_ts { let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min); error!("{}", msg); - let item = LogItem { - node_ix: self.node_ix as _, - level: Level::INFO, - msg, - }; + let item = LogItem::info(msg); self.log_queue.push_back(item); } self.max_ts = max; @@ -185,11 +181,11 @@ impl Stream for EventChunkerMultifile { Ready(Some(k)) => match k { Ok(ofs) => { let msg = format!("received files for timebin {:?}", ofs.timebin); - let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + let item = LogItem::from_node(Level::INFO, msg); self.log_queue.push_back(item); for e in &ofs.files { let msg = format!("file {:?}", e); - let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + let item = LogItem::from_node(Level::INFO, msg); self.log_queue.push_back(item); } self.files_count += ofs.files.len() as u32; @@ -222,7 +218,7 @@ impl Stream for EventChunkerMultifile { continue; } else if ofs.files.len() == 0 { let msg = format!("use opened files {:?} no files", ofs); - let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); + let item = LogItem::from_node(Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { let mut chunkers = Vec::new(); @@ -249,7 +245,7 @@ impl Stream for EventChunkerMultifile { let filtered = RangeFilter2::new(merged, self.range.clone(), self.one_before); self.evs = Some(Box::pin(filtered)); let msg = format!("LOCALLY MERGED"); - let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); + let item = LogItem::from_node(Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } } @@ -261,7 +257,6 @@ impl Stream for EventChunkerMultifile { Ready(None) => { self.done = true; let item = LogItem::from_node( - self.node_ix, Level::DEBUG, format!( "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", diff --git a/crates/disk/src/eventfilter.rs b/crates/disk/src/eventfilter.rs index f043525..d1d1098 100644 --- a/crates/disk/src/eventfilter.rs +++ b/crates/disk/src/eventfilter.rs @@ -35,7 +35,7 @@ impl EventFullShapeFilter { "EventFullShapeFilter shape_derived mismatch {:?} {:?}", sh, self.shape_exp ); - let item = LogItem::from_node(node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); self.log_items.push_back(item); false } else { @@ -47,7 +47,7 @@ impl EventFullShapeFilter { "EventFullShapeFilter shape_derived mismatch {:?} {:?}", sh, self.shape_exp ); - let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + let item = LogItem::from_node(Level::WARN, msg); self.log_items.push_back(item); false } diff --git a/crates/disk/src/streamlog.rs b/crates/disk/src/streamlog.rs index af6f897..ceb7627 100644 --- a/crates/disk/src/streamlog.rs +++ b/crates/disk/src/streamlog.rs @@ -16,11 +16,7 @@ impl Streamlog { } pub fn append(&mut self, level: Level, msg: String) { - let item = LogItem { - node_ix: self.node_ix, - level, - msg, - }; + let item = LogItem::level_msg(level, msg); self.items.push_back(item); } @@ -29,21 +25,21 @@ impl Streamlog { } pub fn emit(item: &LogItem) { - match item.level { + match item.level() { Level::ERROR => { - error!("StreamLog Node {} {}", item.node_ix, item.msg); + error!("StreamLog {}", item.display_log_file()); } Level::WARN => { - warn!("StreamLog Node {} {}", item.node_ix, item.msg); + warn!("StreamLog {}", item.display_log_file()); } Level::INFO => { - info!("StreamLog Node {} {}", item.node_ix, item.msg); + info!("StreamLog {}", item.display_log_file()); } Level::DEBUG => { - debug!("StreamLog Node {} {}", item.node_ix, item.msg); + debug!("StreamLog {}", item.display_log_file()); } Level::TRACE => { - trace!("StreamLog Node {} {}", item.node_ix, item.msg); + trace!("StreamLog {}", item.display_log_file()); } } } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index ad7b48c..d96abc8 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -732,19 +732,22 @@ impl DataApiPython3DataStream { } }, StreamItem::Log(k) => { - let nodeix = k.node_ix; - if k.level == Level::ERROR { - tracing::event!(Level::ERROR, nodeix, message = k.msg); - } else if k.level == Level::WARN { - tracing::event!(Level::WARN, nodeix, message = k.msg); - } else if k.level == Level::INFO { - tracing::event!(Level::INFO, nodeix, message = k.msg); - } else if k.level == Level::DEBUG { - tracing::event!(Level::DEBUG, nodeix, message = k.msg); - } else if k.level == Level::TRACE { - tracing::event!(Level::TRACE, nodeix, message = k.msg); - } else { - tracing::event!(Level::TRACE, nodeix, message = k.msg); + match k.level() { + Level::ERROR => { + error!("{}", k.display_log_file()); + } + Level::WARN => { + warn!("{}", k.display_log_file()); + } + Level::INFO => { + info!("{}", k.display_log_file()); + } + Level::DEBUG => { + debug!("{}", k.display_log_file()); + } + Level::TRACE => { + trace!("{}", k.display_log_file()); + } } Ok(BytesMut::new()) } diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index 656ae93..82be1cd 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -280,10 +280,7 @@ async fn binned_json_framed( let stream = stream.map_err(Error::from); // let msg = format!("{}", res2.url.as_str()); // let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); - let stream = stream.map(|x| { - // - x - }); + let stream = stream.map(|x| x); // use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight; use items_0::streamitem::StreamItem; @@ -321,6 +318,7 @@ async fn binned_json_framed( Err(e) => Err(e), } }); + let stream = stream.map(|x| x); let stream = stream.map_err(|e| daqbuf_err::Error::from_string(e)); let timeout_content_base = res2 .query @@ -328,11 +326,14 @@ async fn binned_json_framed( .unwrap_or(Duration::from_millis(2000)) .min(Duration::from_millis(8000)) .max(Duration::from_millis(334)); + let stream = stream.map(|x| x); + let stream = streams::logqueue::LogItemMux::new(stream, ctx.reqid().into()); let timeout_content_2 = timeout_content_base * 2 / 3; let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); let stream = TimeoutableStream::new(timeout_content_base, res2.timeout_provider, stream); + let stream = stream.map(|x| x); let stream = Box::pin(stream); - let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2); + let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2, true); // let stream = stream.map(|x| Ok(format!("dummy82749827348932"))); // Box::pin(stream) as Pin + Send>> stream diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 560b83c..54f2860 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -228,6 +228,19 @@ async fn the_service_fn( shared_res: Arc, ) -> Result { let ctx = ReqCtx::new_with_node(&req, &node_config); + let reqid = ctx.reqid().into(); + let fut = the_service_fn_log_enabled(ctx, req, addr, node_config, service_version, shared_res); + streams::logqueue::LogQueueFutureWrap::new(reqid, fut).await +} + +async fn the_service_fn_log_enabled( + ctx: ReqCtx, + req: Requ, + addr: SocketAddr, + node_config: NodeConfigCached, + service_version: ServiceVersion, + shared_res: Arc, +) -> Result { let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); let f = http_service(req, addr, ctx, node_config, service_version, shared_res); let f = Cont { f: Box::pin(f) }; diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index e71cd9a..8eb1fb5 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -177,11 +177,7 @@ async fn events_conn_handler_with_reqid( } } { - let item = LogItem { - node_ix: ncc.ix as _, - level: Level::DEBUG, - msg: format!("buf_len_histo: {:?}", buf_len_histo), - }; + let item = LogItem::level_msg(Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo)); let item: Sitemty = Ok(StreamItem::Log(item)); let buf = match item.make_frame_dyn() { Ok(k) => k, diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index e52d2f2..23cd4db 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -8,13 +8,14 @@ edition = "2024" futures-util = "0.3.31" pin-project = "1" async-channel = "2.3.1" -scylla = "1.1" +scylla = "1.2" serde = { version = "1", features = ["derive"] } serde_json = "1" time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] } hashbrown = "0.15.3" autoerr = "0.0" daqbuf-err = { path = "../../../daqbuf-err" } +log = { path = "../../../daqbuf-log", package = "daqbuf-log" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } diff --git a/crates/scyllaconn/src/binned2/frombinned.rs b/crates/scyllaconn/src/binned2/frombinned.rs index 338f30e..f771c14 100644 --- a/crates/scyllaconn/src/binned2/frombinned.rs +++ b/crates/scyllaconn/src/binned2/frombinned.rs @@ -123,6 +123,10 @@ impl FromBinned { } fn build_day1_jobs(&mut self) { + { + let item = items_0::streamitem::LogItem::info("some log string".into()); + streams::logqueue::push_log_item(item); + } let mut jobs = VecDeque::new(); let pbp1 = PrebinnedPartitioning::Day1; self.push_string(format!("binrange {:?}", self.binrange)); diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs index 0015844..d9c382b 100644 --- a/crates/scyllaconn/src/binwriteindex.rs +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -12,8 +12,8 @@ use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty3; use items_0::streamitem::StreamItem; use items_0::streamitem::sitem3_data; +use log::log_item_emit as lg; use netpod::DtMs; -use netpod::log; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; use std::collections::VecDeque; @@ -22,8 +22,8 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "BinWriteIndexRtStream"), @@ -81,6 +81,11 @@ impl BinWriteIndexRtStream { range: NanoRange, scyqueue: ScyllaQueue, ) -> Self { + lg::info!("============================ log item emitted from binwriteindex.rs"); + lg::info!( + "============================ log item emitted from binwriteindex.rs WITH PARAM {}", + 42 + ); info!("{}::new INFO/DEBUG test", Self::type_name()); debug!("{}::new", Self::type_name()); let (msp_beg, lsp_beg) = pbp.msp_lsp(range.beg_ts().to_ts_ms());