Adapt to LogItem api

This commit is contained in:
Dominik Werder
2025-06-18 13:09:24 +02:00
parent 17c24d0c15
commit 72cff94b21
11 changed files with 69 additions and 55 deletions

View File

@@ -344,7 +344,7 @@ impl EventChunker {
self.dbg_path
);
warn!("{}", msg);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
let item = LogItem::from_node(Level::INFO, msg);
self.log_items.push_back(item);
}
if self.seen_before_range_count > 100 {
@@ -442,7 +442,7 @@ impl EventChunker {
scalar_type,
self.fetch_info.scalar_type(),
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
log_items.push(item);
}
if false {
@@ -452,7 +452,7 @@ impl EventChunker {
discard = true;
self.discard_count_shape += 1;
let msg = format!("shape mismatch {:?} {:?}", shape_this, self.fetch_info.shape(),);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
log_items.push(item);
}
}
@@ -486,7 +486,7 @@ impl EventChunker {
self.fetch_info.shape(),
sh,
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
log_items.push(item);
}
}
@@ -499,7 +499,7 @@ impl EventChunker {
self.fetch_info.scalar_type(),
self.fetch_info.shape(),
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
log_items.push(item);
}
}

View File

@@ -136,11 +136,7 @@ impl Stream for EventChunkerMultifile {
if min <= self.max_ts {
let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min);
error!("{}", msg);
let item = LogItem {
node_ix: self.node_ix as _,
level: Level::INFO,
msg,
};
let item = LogItem::info(msg);
self.log_queue.push_back(item);
}
self.max_ts = max;
@@ -185,11 +181,11 @@ impl Stream for EventChunkerMultifile {
Ready(Some(k)) => match k {
Ok(ofs) => {
let msg = format!("received files for timebin {:?}", ofs.timebin);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
let item = LogItem::from_node(Level::INFO, msg);
self.log_queue.push_back(item);
for e in &ofs.files {
let msg = format!("file {:?}", e);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
let item = LogItem::from_node(Level::INFO, msg);
self.log_queue.push_back(item);
}
self.files_count += ofs.files.len() as u32;
@@ -222,7 +218,7 @@ impl Stream for EventChunkerMultifile {
continue;
} else if ofs.files.len() == 0 {
let msg = format!("use opened files {:?} no files", ofs);
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
let item = LogItem::from_node(Level::DEBUG, msg);
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let mut chunkers = Vec::new();
@@ -249,7 +245,7 @@ impl Stream for EventChunkerMultifile {
let filtered = RangeFilter2::new(merged, self.range.clone(), self.one_before);
self.evs = Some(Box::pin(filtered));
let msg = format!("LOCALLY MERGED");
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
let item = LogItem::from_node(Level::DEBUG, msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
}
@@ -261,7 +257,6 @@ impl Stream for EventChunkerMultifile {
Ready(None) => {
self.done = true;
let item = LogItem::from_node(
self.node_ix,
Level::DEBUG,
format!(
"EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}",

View File

@@ -35,7 +35,7 @@ impl<INP> EventFullShapeFilter<INP> {
"EventFullShapeFilter shape_derived mismatch {:?} {:?}",
sh, self.shape_exp
);
let item = LogItem::from_node(node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
self.log_items.push_back(item);
false
} else {
@@ -47,7 +47,7 @@ impl<INP> EventFullShapeFilter<INP> {
"EventFullShapeFilter shape_derived mismatch {:?} {:?}",
sh, self.shape_exp
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
let item = LogItem::from_node(Level::WARN, msg);
self.log_items.push_back(item);
false
}

View File

@@ -16,11 +16,7 @@ impl Streamlog {
}
pub fn append(&mut self, level: Level, msg: String) {
let item = LogItem {
node_ix: self.node_ix,
level,
msg,
};
let item = LogItem::level_msg(level, msg);
self.items.push_back(item);
}
@@ -29,21 +25,21 @@ impl Streamlog {
}
pub fn emit(item: &LogItem) {
match item.level {
match item.level() {
Level::ERROR => {
error!("StreamLog Node {} {}", item.node_ix, item.msg);
error!("StreamLog {}", item.display_log_file());
}
Level::WARN => {
warn!("StreamLog Node {} {}", item.node_ix, item.msg);
warn!("StreamLog {}", item.display_log_file());
}
Level::INFO => {
info!("StreamLog Node {} {}", item.node_ix, item.msg);
info!("StreamLog {}", item.display_log_file());
}
Level::DEBUG => {
debug!("StreamLog Node {} {}", item.node_ix, item.msg);
debug!("StreamLog {}", item.display_log_file());
}
Level::TRACE => {
trace!("StreamLog Node {} {}", item.node_ix, item.msg);
trace!("StreamLog {}", item.display_log_file());
}
}
}

View File

@@ -732,19 +732,22 @@ impl DataApiPython3DataStream {
}
},
StreamItem::Log(k) => {
let nodeix = k.node_ix;
if k.level == Level::ERROR {
tracing::event!(Level::ERROR, nodeix, message = k.msg);
} else if k.level == Level::WARN {
tracing::event!(Level::WARN, nodeix, message = k.msg);
} else if k.level == Level::INFO {
tracing::event!(Level::INFO, nodeix, message = k.msg);
} else if k.level == Level::DEBUG {
tracing::event!(Level::DEBUG, nodeix, message = k.msg);
} else if k.level == Level::TRACE {
tracing::event!(Level::TRACE, nodeix, message = k.msg);
} else {
tracing::event!(Level::TRACE, nodeix, message = k.msg);
match k.level() {
Level::ERROR => {
error!("{}", k.display_log_file());
}
Level::WARN => {
warn!("{}", k.display_log_file());
}
Level::INFO => {
info!("{}", k.display_log_file());
}
Level::DEBUG => {
debug!("{}", k.display_log_file());
}
Level::TRACE => {
trace!("{}", k.display_log_file());
}
}
Ok(BytesMut::new())
}

View File

@@ -280,10 +280,7 @@ async fn binned_json_framed(
let stream = stream.map_err(Error::from);
// let msg = format!("{}", res2.url.as_str());
// let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
let stream = stream.map(|x| {
//
x
});
let stream = stream.map(|x| x);
// use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight;
use items_0::streamitem::StreamItem;
@@ -321,6 +318,7 @@ async fn binned_json_framed(
Err(e) => Err(e),
}
});
let stream = stream.map(|x| x);
let stream = stream.map_err(|e| daqbuf_err::Error::from_string(e));
let timeout_content_base = res2
.query
@@ -328,11 +326,14 @@ async fn binned_json_framed(
.unwrap_or(Duration::from_millis(2000))
.min(Duration::from_millis(8000))
.max(Duration::from_millis(334));
let stream = stream.map(|x| x);
let stream = streams::logqueue::LogItemMux::new(stream, ctx.reqid().into());
let timeout_content_2 = timeout_content_base * 2 / 3;
let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None]));
let stream = TimeoutableStream::new(timeout_content_base, res2.timeout_provider, stream);
let stream = stream.map(|x| x);
let stream = Box::pin(stream);
let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2);
let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2, true);
// let stream = stream.map(|x| Ok(format!("dummy82749827348932")));
// Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
stream

View File

@@ -228,6 +228,19 @@ async fn the_service_fn(
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, Error> {
let ctx = ReqCtx::new_with_node(&req, &node_config);
let reqid = ctx.reqid().into();
let fut = the_service_fn_log_enabled(ctx, req, addr, node_config, service_version, shared_res);
streams::logqueue::LogQueueFutureWrap::new(reqid, fut).await
}
async fn the_service_fn_log_enabled(
ctx: ReqCtx,
req: Requ,
addr: SocketAddr,
node_config: NodeConfigCached,
service_version: ServiceVersion,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, Error> {
let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid());
let f = http_service(req, addr, ctx, node_config, service_version, shared_res);
let f = Cont { f: Box::pin(f) };

View File

@@ -177,11 +177,7 @@ async fn events_conn_handler_with_reqid(
}
}
{
let item = LogItem {
node_ix: ncc.ix as _,
level: Level::DEBUG,
msg: format!("buf_len_histo: {:?}", buf_len_histo),
};
let item = LogItem::level_msg(Level::DEBUG, format!("buf_len_histo: {:?}", buf_len_histo));
let item: Sitemty<ChannelEvents> = Ok(StreamItem::Log(item));
let buf = match item.make_frame_dyn() {
Ok(k) => k,

View File

@@ -8,13 +8,14 @@ edition = "2024"
futures-util = "0.3.31"
pin-project = "1"
async-channel = "2.3.1"
scylla = "1.1"
scylla = "1.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] }
hashbrown = "0.15.3"
autoerr = "0.0"
daqbuf-err = { path = "../../../daqbuf-err" }
log = { path = "../../../daqbuf-log", package = "daqbuf-log" }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
query = { path = "../../../daqbuf-query", package = "daqbuf-query" }
items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" }

View File

@@ -123,6 +123,10 @@ impl FromBinned {
}
fn build_day1_jobs(&mut self) {
{
let item = items_0::streamitem::LogItem::info("some log string".into());
streams::logqueue::push_log_item(item);
}
let mut jobs = VecDeque::new();
let pbp1 = PrebinnedPartitioning::Day1;
self.push_string(format!("binrange {:?}", self.binrange));

View File

@@ -12,8 +12,8 @@ use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty3;
use items_0::streamitem::StreamItem;
use items_0::streamitem::sitem3_data;
use log::log_item_emit as lg;
use netpod::DtMs;
use netpod::log;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use std::collections::VecDeque;
@@ -22,8 +22,8 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ); }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
autoerr::create_error_v1!(
name(Error, "BinWriteIndexRtStream"),
@@ -81,6 +81,11 @@ impl BinWriteIndexRtStream {
range: NanoRange,
scyqueue: ScyllaQueue,
) -> Self {
lg::info!("============================ log item emitted from binwriteindex.rs");
lg::info!(
"============================ log item emitted from binwriteindex.rs WITH PARAM {}",
42
);
info!("{}::new INFO/DEBUG test", Self::type_name());
debug!("{}::new", Self::type_name());
let (msp_beg, lsp_beg) = pbp.msp_lsp(range.beg_ts().to_ts_ms());