diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index 68d0e81..4acd40f 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -5,8 +5,6 @@ mod api4; pub mod archapp; pub mod binnedjson; #[cfg(test)] -mod events; -#[cfg(test)] mod timeweightedjson; use bytes::BytesMut; diff --git a/daqbufp2/src/test/api1.rs b/daqbufp2/src/test/api1.rs index 33a3f2f..46510e4 100644 --- a/daqbufp2/src/test/api1.rs +++ b/daqbufp2/src/test/api1.rs @@ -60,7 +60,7 @@ fn events_f64_plain() -> Result<(), Error> { let accept = "application/octet-stream"; let range = Api1Range::new("1970-01-01T00:00:00Z".try_into()?, "1970-01-01T00:01:00Z".try_into()?)?; // TODO the channel list needs to get pre-processed to check for backend prefix! - let ch = ChannelTuple::new(TEST_BACKEND.into(), "scalar-i32-be".into()); + let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into()); let qu = Api1Query::new(range, vec![ch]); let body = serde_json::to_string(&qu)?; let buf = http_post(url, accept, body.into()).await?; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index ded49d0..2d0b18c 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -364,57 +364,6 @@ fn binned_d0_json_04() -> Result<(), Error> { taskrun::run(fut) } -#[test] -fn binned_inmem_d0_json_00() -> Result<(), Error> { - let fut = async { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - let query = make_query( - "inmem-d0-i32", - "1970-01-01T00:20:04.000Z", - "1970-01-01T00:21:10.000Z", - 10, - )?; - let jsv = fetch_binned_json(query, cluster).await?; - let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; - assert_eq!(res.range_final(), true); - assert_eq!(res.timed_out(), false); - assert_eq!(res.len(), 14); - assert_eq!(res.ts_anchor_sec(), 1200); - { - let v1: Vec<_> = res.counts().iter().map(|x| *x).collect(); - assert_eq!(&v1, &[5; 14]); - } - { - let v1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect(); - let v2: Vec<_> = (0..14).into_iter().map(|x| 5000 * x).collect(); - assert_eq!(&v1, &v2); - } - { - let v1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect(); - let v2: Vec<_> = (1..15).into_iter().map(|x| 5000 * x).collect(); - assert_eq!(&v1, &v2); - } - { - let v1: Vec<_> = res.mins().iter().map(|x| *x).collect(); - let v2: Vec<_> = (0..14).into_iter().map(|x| 1200 + 5 * x).collect(); - assert_eq!(&v1, &v2); - } - { - let v1: Vec<_> = res.maxs().iter().map(|x| *x).collect(); - let v2: Vec<_> = (0..14).into_iter().map(|x| 1204 + 5 * x).collect(); - assert_eq!(&v1, &v2); - } - { - let v1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); - let v2: Vec<_> = (0..14).into_iter().map(|x| 1202. + 5. * x as f32).collect(); - assert_eq!(f32_iter_cmp_near(v1, v2, 0.05, 0.05), true); - } - Ok(()) - }; - taskrun::run(fut) -} - async fn get_binned_json( channel: Channel, beg_date: &str, @@ -453,7 +402,7 @@ async fn get_binned_json( let s = String::from_utf8_lossy(&buf); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; - info!("Received from remote:\n{pretty}"); + debug!("get_binned_json pretty {pretty}"); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout diff --git a/daqbufp2/src/test/api4/common.rs b/daqbufp2/src/test/api4/common.rs index 7082710..7e95e27 100644 --- a/daqbufp2/src/test/api4/common.rs +++ b/daqbufp2/src/test/api4/common.rs @@ -21,7 +21,7 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("http get {}", url); + debug!("fetch_events_json url {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -54,7 +54,7 @@ pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result< let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("http get {}", url); + debug!("fetch_binned_json url {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 5be8322..7960d04 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -20,12 +20,7 @@ use url::Url; const TEST_BACKEND: &str = "testbackend-00"; -fn make_query>( - name: S, - beg_date: &str, - end_date: &str, - //bin_count_min: u32, -) -> Result { +fn make_query>(name: S, beg_date: &str, end_date: &str) -> Result { let channel = Channel { backend: TEST_BACKEND.into(), name: name.into(), @@ -50,7 +45,7 @@ fn events_plain_json_00() -> Result<(), Error> { )?; let jsv = fetch_events_json(query, cluster).await?; let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; - // Tim-weighted will use one event before: + // Tim-weighted uses one event before requested range: assert_eq!(res.len(), 133); assert_eq!(res.ts_anchor_sec(), 1203); Ok(()) @@ -114,7 +109,6 @@ async fn events_plain_json( } let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); - //info!("received from server: {s}"); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; info!("{pretty}"); diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs index 1868d6d..82b7a00 100644 --- a/daqbufp2/src/test/archapp.rs +++ b/daqbufp2/src/test/archapp.rs @@ -1,7 +1,5 @@ #![allow(unused)] -use super::events::get_plain_events_json; use crate::nodes::require_archapp_test_host_running; -use crate::test::events::ch_gen; use err::Error; use netpod::f64_close; use netpod::log::*; @@ -18,6 +16,7 @@ fn get_events_1() -> Result<(), Error> { let rh = require_archapp_test_host_running()?; let cluster = &rh.cluster; let res = get_plain_events_json( + // TODO this just added test backend name, no series id. ch_gen("SARUN16-MQUA080:X"), "2021-01-04T00:00:00Z", "2021-01-30T00:00:00Z", diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs deleted file mode 100644 index 8481883..0000000 --- a/daqbufp2/src/test/events.rs +++ /dev/null @@ -1,296 +0,0 @@ -use crate::err::ErrConv; -use crate::nodes::require_test_hosts_running; -use chrono::DateTime; -use chrono::Utc; -use disk::streamlog::Streamlog; -use err::Error; -use futures_util::StreamExt; -use futures_util::TryStreamExt; -use http::StatusCode; -use httpclient::HttpBodyAsAsyncRead; -use hyper::Body; -use items_0::streamitem::StreamItem; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::AppendToUrl; -use netpod::Channel; -use netpod::Cluster; -use netpod::HostPort; -use netpod::PerfOpts; -use netpod::APP_JSON; -use netpod::APP_OCTET; -use query::api4::events::PlainEventsQuery; -use serde_json::Value as JsonValue; -use std::fmt::Debug; -use std::future::ready; -use streams::frames::inmem::InMemoryFrameAsyncReadStream; -use tokio::io::AsyncRead; -use url::Url; - -const TEST_BACKEND: &str = "testbackend-00"; - -fn ch_adhoc(name: &str) -> Channel { - Channel { - series: None, - backend: TEST_BACKEND.into(), - name: name.into(), - } -} - -pub fn ch_gen(name: &str) -> Channel { - Channel { - series: None, - backend: TEST_BACKEND.into(), - name: name.into(), - } -} - -// TODO OFFENDING TEST add actual checks on result -async fn get_plain_events_binary_0_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - if true { - get_plain_events_binary( - "scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:50.000Z", - cluster, - true, - 4, - ) - .await?; - } - Ok(()) -} - -#[test] -fn get_plain_events_binary_0() { - taskrun::run(get_plain_events_binary_0_inner()).unwrap(); -} - -async fn get_plain_events_binary( - channel_name: &str, - beg_date: &str, - end_date: &str, - cluster: &Cluster, - _expect_range_complete: bool, - _expect_event_count: u64, -) -> Result { - let t1 = Utc::now(); - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let channel_backend = TEST_BACKEND; - let perf_opts = PerfOpts::default(); - let channel = Channel { - backend: channel_backend.into(), - name: channel_name.into(), - series: None, - }; - let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range).for_time_weighted_scalar(); - let hp = HostPort::from_node(node0); - let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; - query.append_to_url(&mut url); - let url = url; - debug!("get_plain_events_binary get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("client response {res:?}"); - return Err(format!("get_plain_events_binary client response {res:?}").into()); - } - let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - let res = consume_plain_events_binary(s2).await?; - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - // TODO add timeout - debug!("get_plain_events_binary time {} ms", ms); - if !res.is_valid() { - Ok(res) - } else { - Ok(res) - } -} - -#[allow(unused)] -#[derive(Debug)] -pub struct EventsResponse { - event_count: u64, - err_item_count: u64, - data_item_count: u64, - bytes_read: u64, - range_complete_count: u64, - log_item_count: u64, - #[allow(unused)] - stats_item_count: u64, -} - -impl EventsResponse { - pub fn new() -> Self { - Self { - event_count: 0, - err_item_count: 0, - data_item_count: 0, - bytes_read: 0, - range_complete_count: 0, - log_item_count: 0, - stats_item_count: 0, - } - } - - pub fn is_valid(&self) -> bool { - if self.range_complete_count > 1 { - false - } else { - true - } - } -} - -async fn consume_plain_events_binary(inp: InMemoryFrameAsyncReadStream) -> Result -where - T: AsyncRead + Unpin, -{ - let s1 = inp - .map_err(|e| error!("TEST GOT ERROR {:?}", e)) - .filter_map(|item| { - let g = match item { - Ok(item) => match item { - StreamItem::Log(item) => { - Streamlog::emit(&item); - None - } - StreamItem::Stats(item) => { - debug!("Stats: {:?}", item); - None - } - StreamItem::DataItem(_frame) => { - err::todo(); - Some(Ok(())) - } - }, - Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), - }; - ready(g) - }) - .fold(EventsResponse::new(), |a, _x| ready(a)); - let ret = s1.await; - debug!("result: {:?}", ret); - Ok(ret) -} - -async fn get_plain_events_json_0_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_plain_events_json( - ch_gen("scalar-i32-be"), - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:12.000Z", - cluster, - true, - 4, - ) - .await?; - Ok(()) -} - -#[test] -fn get_plain_events_json_0() { - taskrun::run(get_plain_events_json_0_inner()).unwrap(); -} - -async fn get_plain_events_json_1_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_plain_events_json( - ch_gen("wave-f64-be-n21"), - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:12.000Z", - cluster, - true, - 4, - ) - .await?; - Ok(()) -} - -#[test] -fn get_plain_events_json_1() { - taskrun::run(get_plain_events_json_1_inner()).unwrap(); -} - -async fn get_plain_events_json_2_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - get_plain_events_json( - ch_adhoc("inmem-d0-i32"), - "1970-01-01T00:20:04.000Z", - "1970-01-01T00:20:10.000Z", - cluster, - true, - 4, - ) - .await?; - Ok(()) -} - -#[test] -fn get_plain_events_json_2() { - taskrun::run(get_plain_events_json_2_inner()).unwrap(); -} - -// TODO improve by a more information-rich return type. -pub async fn get_plain_events_json( - channel: Channel, - beg_date: &str, - end_date: &str, - cluster: &Cluster, - _expect_range_complete: bool, - _expect_event_count: u64, -) -> Result { - let t1 = Utc::now(); - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range); - let hp = HostPort::from_node(node0); - let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; - query.append_to_url(&mut url); - let url = url; - info!("get_plain_events get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - - trace!("Response {res:?}"); - - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); - let res: JsonValue = serde_json::from_str(&s)?; - - eprintln!("res {res:?}"); - - // TODO assert more - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - // TODO add timeout - debug!("time {} ms", ms); - Ok(res) -} diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 674ccfd..5a6d621 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -105,11 +105,10 @@ async fn plain_events_json( // Update the series id since we don't require some unique identifier yet. let mut query = query; let kk = chconf.try_series(); - info!("kk debug {kk:?}"); let kk = kk.context("plain_events_json"); if let Err(e) = &kk { - info!("kk ctx debug {kk:?}"); - info!("kk e ctx display {e}"); + warn!("kk ctx debug {kk:?}"); + warn!("kk e ctx display {e}"); } query.set_series_id(kk?); let query = query; diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index b6a321e..6ed2d6e 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -213,7 +213,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct { type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { - info!("{} INGEST", Self::type_name()); + trace!("{} INGEST", Self::type_name()); if self.binner.is_none() { self.binner = Some(Box::new(TimeBinnableTy::time_binner_new( item, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 4219502..a3d0552 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -830,7 +830,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner { type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { - info!("{} INGEST", Self::type_name()); + trace!("{} INGEST", Self::type_name()); match item { ChannelEvents::Events(item) => { if self.binner.is_none() { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 9c8affc..4c43b78 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -71,6 +71,10 @@ pub struct EventsDim0 { } impl EventsDim0 { + pub fn type_name() -> &'static str { + std::any::type_name::() + } + pub fn push_front(&mut self, ts: u64, pulse: u64, value: STY) { self.tss.push_front(ts); self.pulses.push_front(pulse); @@ -740,8 +744,9 @@ impl TypeName for EventsDim0 { impl EventsNonObj for EventsDim0 { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - info!( - "EventsDim0::into_tss_pulses len {} len {}", + trace!( + "{}::into_tss_pulses len {} len {}", + Self::type_name(), self.tss.len(), self.pulses.len() ); @@ -985,19 +990,19 @@ impl TimeBinner for EventsDim0TimeBinner { // That needs modified interfaces which can take and yield the start and latest index. loop { while item.starts_after(self.agg.range()) { - trace_ingest_item!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); + trace_ingest_item!("{self_name} ignore item and cycle starts_after"); self.cycle(); if self.rng.is_none() { - warn!("{self_name} no more bin in edges B"); + debug!("{self_name} no more bin in edges after starts_after"); return; } } if item.ends_before(self.agg.range()) { - trace_ingest_item!("{self_name} IGNORE ITEM BECAUSE ends_before"); + trace_ingest_item!("{self_name} ignore item ends_before"); return; } else { if self.rng.is_none() { - trace_ingest_item!("{self_name} no more bin in edges D"); + trace_ingest_item!("{self_name} no more bin in edges"); return; } else { if let Some(item) = item @@ -1012,13 +1017,13 @@ impl TimeBinner for EventsDim0TimeBinner { trace_ingest_item!("{self_name} FED ITEM, ENDS AFTER agg-range {:?}", self.agg.range()); self.cycle(); if self.rng.is_none() { - warn!("{self_name} no more bin in edges C"); + warn!("{self_name} no more bin in edges after ingest and cycle"); return; } else { - trace_ingest_item!("{self_name} FED ITEM, CYCLED, CONTINUE."); + trace_ingest_item!("{self_name} item fed, cycled, continue"); } } else { - trace_ingest_item!("{self_name} FED ITEM."); + trace_ingest_item!("{self_name} item fed, break"); break; } } else { diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index e3aed4a..7ccfb91 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -847,7 +847,7 @@ where } fn ingest(&mut self, item: &Self::Input) { - debug!("{} ingest", Self::type_name()); + trace!("{} ingest", Self::type_name()); if self.do_time_weight { self.ingest_time_weight(item) } else { diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 1f975eb..3c65e7f 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -27,20 +27,20 @@ const DO_DETECT_NON_MONO: bool = true; #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } #[allow(unused)] macro_rules! trace3 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } #[allow(unused)] macro_rules! trace4 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; } pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { @@ -296,7 +296,7 @@ where } RangeCompletableItem::RangeComplete => { self.range_complete[i] = true; - debug!("Merger range_complete {:?}", self.range_complete); + trace!("range_complete {:?}", self.range_complete); continue; } }, @@ -362,7 +362,7 @@ where if let Some(o) = self.out.as_ref() { if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { if o.len() > self.out_max_len { - info!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len); + debug!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len); } trace3!("decide to output"); self.do_clear_out = false; @@ -437,7 +437,7 @@ where } } else if let Some(item) = self.out_of_band_queue.pop_front() { let item = on_sitemty_data!(item, |k: T| { - info!("++++++++++++ EMIT OUT OF BAND DATA len {}", k.len()); + trace3!("emit out-of-band data len {}", k.len()); sitem_data(k) }); trace!("emit out-of-band"); diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index ec03289..5413840 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -14,16 +14,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig if channel.backend() == TEST_BACKEND { let backend = channel.backend().into(); // TODO the series-ids here are just random. Need to integrate with better test setup. - let ret = if channel.name() == "inmem-d0-i32" { - let ret = ChConf { - backend, - series: Some(1), - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "scalar-i32-be" { + let ret = if channel.name() == "scalar-i32-be" { let ret = ChConf { backend, series: Some(2), diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index bc1ce05..db62d62 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -74,9 +74,8 @@ async fn make_channel_events_stream_data( chconf: ChConf, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - info!("nodenet::conn::make_channel_events_stream"); if evq.channel().backend() == TEST_BACKEND { - warn!("GENERATE INMEM TEST DATA"); + debug!("use test backend data {}", TEST_BACKEND); let node_count = node_config.node_config.cluster.nodes.len() as u64; let node_ix = node_config.ix as u64; let chn = evq.channel().name(); @@ -162,7 +161,6 @@ async fn make_channel_events_stream( } async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { - warn!("fix magic inmem_bufcap option"); let perf_opts = PerfOpts::default(); let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); let mut frames = Vec::new(); @@ -223,7 +221,7 @@ async fn events_parse_input_query( return Err(e); } }; - info!("events_conn_handler_inner_try evq {:?}", evq); + debug!("events_parse_input_query {:?}", evq); let chconf = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), ncc) .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; @@ -241,7 +239,6 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - debug!("events_conn_handler input frames received"); let (evq, chconf) = match events_parse_input_query(frames, node_config).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 4313e58..3de80b7 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -92,7 +92,7 @@ impl Collect { Ok(()) } RangeCompletableItem::Data(mut item) => { - info!("collect sees len {}", item.len()); + trace!("collect sees len {}", item.len()); let coll = self.collector.get_or_insert_with(|| item.new_collector()); coll.ingest(&mut item); if coll.len() as u64 >= self.events_max { @@ -245,7 +245,7 @@ where } } RangeCompletableItem::Data(mut item) => { - info!("collect sees len {}", item.len()); + trace!("collect sees len {}", item.len()); if collector.is_none() { let c = item.new_collector(); collector = Some(c); diff --git a/streams/src/generators.rs b/streams/src/generators.rs index ca44d4c..d2899c3 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -61,7 +61,7 @@ impl GenerateI32V00 { let mut item = EventsDim0::empty(); let mut ts = self.ts; loop { - if self.ts >= self.tsend || item.byte_estimate() > 200 { + if self.ts >= self.tsend || item.byte_estimate() > 100 { break; } let pulse = ts; @@ -133,8 +133,8 @@ impl GenerateI32V01 { let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl; let tsend = range.end.min(DAY); let have_range_final = range.end < (DAY - ivl); - info!( - "START GENERATOR GenerateI32V01 ivl {} dts {} ts {} one_before_range {}", + debug!( + "GenerateI32V01::new ivl {} dts {} ts {} one_before_range {}", ivl, dts, ts, one_before_range ); Self { @@ -157,7 +157,7 @@ impl GenerateI32V01 { let mut item = EventsDim0::empty(); let mut ts = self.ts; loop { - if self.ts >= self.tsend || item.byte_estimate() > 200 { + if self.ts >= self.tsend || item.byte_estimate() > 100 { break; } let pulse = ts; @@ -235,8 +235,8 @@ impl GenerateF64V00 { let dts = ivl * node_count as u64; let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl; let tsend = range.end; - info!( - "START GENERATOR GenerateF64V00 ivl {} dts {} ts {} one_before_range {}", + debug!( + "GenerateF64V00::new ivl {} dts {} ts {} one_before_range {}", ivl, dts, ts, one_before_range ); Self { @@ -257,7 +257,7 @@ impl GenerateF64V00 { let mut item = EventsDim1::empty(); let mut ts = self.ts; loop { - if self.ts >= self.tsend || item.byte_estimate() > 1024 * 4 { + if self.ts >= self.tsend || item.byte_estimate() > 400 { break; } let pulse = ts; @@ -278,7 +278,7 @@ impl GenerateF64V00 { ts += self.dts; } self.ts = ts; - info!("generated len {}", item.len()); + trace!("generated len {}", item.len()); let w = ChannelEvents::Events(Box::new(item) as _); let w = sitem_data(w); w diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 7efae4d..a5ea971 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -47,7 +47,7 @@ pub async fn plain_events_json( let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - info!("-------------------------\ngot len {}", k.len()); + trace!("got len {}", k.len()); let k = tr.0.transform(k); let k: Box = Box::new(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index db71d0c..dc5a6ae 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -44,8 +44,8 @@ where } pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self { - info!( - "----------------------------\n------------------------\n------------------------\n{}::new range: {:?} one_before_range: {:?}", + trace!( + "{}::new range: {:?} one_before_range: {:?}", Self::type_name(), range, one_before_range diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index 9f87da8..404e22c 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -1,10 +1,7 @@ use crate::collect::collect; -use crate::generators::GenerateF64V00; use crate::generators::GenerateI32V00; use crate::test::runfut; use crate::transform::build_event_transform; -use chrono::DateTime; -use chrono::Utc; use err::Error; use futures_util::stream; use futures_util::StreamExt; @@ -39,7 +36,7 @@ fn nano_range_from_str(beg_date: &str, end_date: &str) -> Result Result<(), Error> { let fut = async { let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?; let range = SeriesRange::TimeRange(range); @@ -104,11 +101,11 @@ fn time_bin_00() { } Ok(()) }; - runfut(fut).unwrap() + runfut(fut) } #[test] -fn time_bin_01() { +fn time_bin_01() -> Result<(), Error> { let fut = async { let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?; let range = SeriesRange::TimeRange(range); @@ -138,7 +135,6 @@ fn time_bin_01() { } }); let stream0 = Box::pin(stream0); - let deadline = Instant::now() + Duration::from_millis(200); let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true); while let Some(item) = binned_stream.next().await { if true { @@ -167,7 +163,7 @@ fn time_bin_01() { // TODO add similar test case with a RangeComplete event at different places before the timeout. Ok(()) }; - runfut(fut).unwrap() + runfut(fut) } #[test] @@ -259,9 +255,14 @@ fn time_bin_02() -> Result<(), Error> { runfut(fut) } -// +// Should fail because of missing empty item. +// But should have some option to suppress the error log for this test case. #[test] -fn time_bin_03() { +fn time_bin_03() -> Result<(), Error> { + // TODO re-enable with error log suppressed. + if true { + return Ok(()); + } let fut = async { let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?; let range = SeriesRange::TimeRange(range); @@ -297,7 +298,7 @@ fn time_bin_03() { } return Err(Error::with_msg_no_trace("should not succeed")); }; - runfut(fut).unwrap() + runfut(fut) } // TODO add test case to observe RangeComplete after binning. diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index c05d176..9a0413e 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -105,7 +105,7 @@ where self.process_item(item); let mut do_emit = false; if self.done_first_input == false { - info!( + debug!( "emit container after the first input len {} binner {}", item_len, self.binner.is_some() @@ -127,7 +127,6 @@ where if let Some(bins) = binner.bins_ready() { Ok(Break(Ready(sitem_data(bins)))) } else { - warn!("must emit but got nothing"); if let Some(bins) = binner.empty() { Ok(Break(Ready(sitem_data(bins)))) } else { @@ -193,7 +192,6 @@ where self.done_data = true; Ok(Break(Ready(sitem_data(bins)))) } else { - warn!("must emit but got nothing"); if let Some(bins) = binner.empty() { self.done_data = true; Ok(Break(Ready(sitem_data(bins)))) diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index e9e0e32..949dc59 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -53,7 +53,7 @@ async fn timebinnable_stream( let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - info!("-------------------------\ngot len {}", k.len()); + trace!("got len {}", k.len()); let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) @@ -89,7 +89,7 @@ fn timebinned_to_collectable( let stream = stream.map(|k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - info!("-------------------------\ngot len {}", k.len()); + trace!("got len {}", k.len()); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) }); @@ -98,7 +98,6 @@ fn timebinned_to_collectable( } pub async fn timebinned_json(query: BinnedQuery, _chconf: ChConf, cluster: Cluster) -> Result { - info!("~~~~~~~~~~~ timebinned_json"); let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let collect_max = 10000;