From 2f608a8a4e9ee9f9e143c4871e357cf2ce626b43 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 9 Nov 2021 19:18:03 +0100 Subject: [PATCH] Reduce log chatter --- archapp/src/archeng.rs | 13 +- archapp/src/archeng/datablock.rs | 2 +- archapp/src/archeng/datablockstream.rs | 11 +- archapp/src/archeng/indexfiles.rs | 8 +- archapp/src/archeng/indextree.rs | 4 +- archapp/src/test.rs | 9 +- archapp/src/timed.rs | 2 +- daqbufp2/src/test/binnedbinary.rs | 14 +- daqbufp2/src/test/binnedjson.rs | 8 +- daqbufp2/src/test/events.rs | 23 +- daqbufp2/src/test/timeweightedjson.rs | 9 +- disk/Cargo.toml | 5 +- disk/src/binned.rs | 8 +- disk/src/binned/query.rs | 2 +- disk/src/channelexec.rs | 4 +- disk/src/dataopen.rs | 11 +- disk/src/{lib.rs => disk.rs} | 7 +- disk/src/eventblobs.rs | 12 +- disk/src/eventchunker.rs | 4 +- disk/src/merge.rs | 15 +- disk/src/merge/mergedfromremotes.rs | 2 +- disk/src/raw/client.rs | 4 +- fsio/src/fsio.rs | 3 +- httpret/src/api1.rs | 32 +-- httpret/src/httpret.rs | 17 +- items/src/xbinnedscalarevents.rs | 2 +- netfetch/Cargo.toml | 1 + netfetch/src/netbuf.rs | 26 +++ netfetch/src/test.rs | 16 +- netfetch/src/zmtp.rs | 287 +++++++++++++++++++------ netpod/src/netpod.rs | 16 ++ nodenet/src/conn.rs | 5 +- 32 files changed, 388 insertions(+), 194 deletions(-) rename disk/src/{lib.rs => disk.rs} (98%) diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 32dd6ef..3e417f5 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -118,7 +118,7 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver { - info!("stats: {:?}", item); + debug!("stats: {:?}", item); } _ => {} } @@ -179,9 +179,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } JsVal(jsval) => { - if true { - info!("jsval: {}", serde_json::to_string(&jsval)?); - } + debug!("jsval: {}", serde_json::to_string(&jsval)?); } }, Err(e) => { @@ -208,9 +206,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } JsVal(jsval) => { - if true { - info!("jsval: {}", serde_json::to_string(&jsval)?); - } + debug!("jsval: {}", serde_json::to_string(&jsval)?); } }, Err(e) => { @@ -285,7 +281,8 @@ mod test { let mut data_file = open_read(data_path, stats).await?; let datafile_header = read_datafile_header(&mut data_file, datablock.data_header_pos(), stats).await?; let events = read_data_1(&mut data_file, &datafile_header, range.clone(), false, stats).await?; - info!("read events: {:?}", events); + debug!("read events: {:?}", events); + // TODO assert more Ok(()) }; Ok(taskrun::run(fut).unwrap()) diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index b99a012..346da35 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -513,7 +513,7 @@ pub async fn read_data_1( evs.values.push(value); } } - info!("parsed block with {} / {} events", ntot, evs.tss.len()); + debug!("parsed block with {} / {} events", ntot, evs.tss.len()); let evs = ScalarPlainEvents::Double(evs); let plain = PlainEvents::Scalar(evs); let item = EventsItem::Plain(plain); diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs index 01511ec..4dea2cb 100644 --- a/archapp/src/archeng/datablockstream.rs +++ b/archapp/src/archeng/datablockstream.rs @@ -322,9 +322,9 @@ mod test { use futures_util::StreamExt; use items::eventsitem::EventsItem; use items::{LogItem, Sitemty, StatsItem, StreamItem}; + use netpod::log::*; use netpod::timeunits::SEC; - use netpod::{log::*, RangeFilterStats}; - use netpod::{Channel, NanoRange}; + use netpod::{Channel, NanoRange, RangeFilterStats}; use serde::Serialize; use std::collections::VecDeque; use std::path::PathBuf; @@ -358,7 +358,12 @@ mod test { let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand); let mut stream = filtered; while let Some(block) = stream.next().await { - info!("DatablockStream yields: {:?}", block); + match block { + Ok(_) => { + //TODO assert more + } + Err(e) => return Err(e), + } } Ok(()) }; diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index bdf1fd2..7776cd6 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -544,7 +544,13 @@ pub async fn index_files_index_ref + Send>( let index_files_index_path = index_files_index_path.into(); let index_files_index = { let timed1 = Timed::new("slurp_index_bytes"); - let mut index_files_index = open_read(index_files_index_path, stats).await?; + let mut index_files_index = match open_read(index_files_index_path.clone(), stats).await { + Ok(k) => Ok(k), + Err(e) => { + warn!("can not open indexfile {:?}", index_files_index_path); + Err(e) + } + }?; let mut buf = vec![0; 1024 * 1024 * 50]; let mut ntot = 0; loop { diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index 97f3f50..084d92f 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -125,7 +125,6 @@ impl IndexFileBasics { rb.fill_min(min0).await?; let buf = rb.data(); let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?; - info!("parsed entry {:?}", entry); pos = entry.next; entries.push(entry); } @@ -1155,7 +1154,8 @@ mod test { let mut i1 = 0; let mut ts_max = Nanos::from_ns(0); while let Some(rec) = iter.next().await? { - info!("GOT RECORD: {:?} {:?}", rec.beg, rec.target); + // TODO assert + debug!("GOT RECORD: {:?} {:?}", rec.beg, rec.target); if rec.beg <= ts_max { return Err(Error::with_msg_no_trace("BAD ORDER")); } diff --git a/archapp/src/test.rs b/archapp/src/test.rs index 5e35cd9..da2a5e3 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -23,6 +23,7 @@ fn read_pb_00() -> Result<(), Error> { .join("X:2021_01.pb"); let f1 = tokio::fs::read(path).await?; let mut j1 = 0; + // TODO assert more loop { let mut i2 = usize::MAX; for (i1, &k) in f1[j1..].iter().enumerate() { @@ -32,17 +33,17 @@ fn read_pb_00() -> Result<(), Error> { } } if i2 != usize::MAX { - info!("got NL {} .. {}", j1, i2); + debug!("got NL {} .. {}", j1, i2); let m = unescape_archapp_msg(&f1[j1..i2], vec![])?; if j1 == 0 { let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap(); - info!("got payload_info: {:?}", payload_info); + debug!("got payload_info: {:?}", payload_info); } else { let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap(); - info!("got scalar_double: {:?}", scalar_double); + debug!("got scalar_double: {:?}", scalar_double); } } else { - info!("no more packets"); + debug!("no more packets"); break; } j1 = i2 + 1; diff --git a/archapp/src/timed.rs b/archapp/src/timed.rs index 8a4d974..b4fc284 100644 --- a/archapp/src/timed.rs +++ b/archapp/src/timed.rs @@ -23,6 +23,6 @@ impl Drop for Timed { fn drop(&mut self) { let ts2 = Instant::now(); let dt = ts2.duration_since(self.ts1); - info!("Timed {} {:?}", self.name, dt); + debug!("Timed {} {:?}", self.name, dt); } } diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index e97632e..a5c737d 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -110,7 +110,7 @@ where let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("get_binned_channel get {}", url); + debug!("get_binned_channel get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -126,7 +126,7 @@ where let res = consume_binned_response::(s2).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); + debug!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); if !res.is_valid() { Err(Error::with_msg(format!("invalid response: {:?}", res))) } else if res.range_complete_count == 0 && expect_range_complete { @@ -186,7 +186,8 @@ where None } StreamItem::Stats(item) => { - info!("Stats: {:?}", item); + // TODO collect somewhere + debug!("Stats: {:?}", item); None } StreamItem::DataItem(frame) => { @@ -200,10 +201,7 @@ where Streamlog::emit(&item); Some(Ok(StreamItem::Log(item))) } - item => { - info!("TEST GOT ITEM {:?}", item); - Some(Ok(item)) - } + item => Some(Ok(item)), }, Err(e) => { error!("TEST GOT ERROR FRAME: {:?}", e); @@ -253,6 +251,6 @@ where ready(g) }); let ret = s1.await; - info!("BinnedResponse: {:?}", ret); + debug!("BinnedResponse: {:?}", ret); Ok(ret) } diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 36f705f..a775e81 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -98,7 +98,7 @@ async fn get_binned_json_common( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; query.append_to_url(&mut url); let url = url; - info!("get_binned_json_common get {}", url); + debug!("get_binned_json_common get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -112,11 +112,11 @@ async fn get_binned_json_common( let res = hyper::body::to_bytes(res.into_body()).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_binned_json_common DONE time {} ms", ms); + debug!("get_binned_json_common DONE time {} ms", ms); let res = String::from_utf8_lossy(&res).to_string(); - //info!("get_binned_json_common res: {}", res); let res: serde_json::Value = serde_json::from_str(res.as_str())?; - info!( + // TODO assert more + debug!( "result from endpoint: --------------\n{}\n--------------", serde_json::to_string_pretty(&res)? ); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 2e65219..258eabe 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -67,7 +67,7 @@ where let mut url = Url::parse(&format!("http://{}:{}", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("get_plain_events get {}", url); + debug!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -83,7 +83,8 @@ where let res = consume_plain_events_binary::(s2).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("time {} ms", ms); + // TODO add timeout + debug!("time {} ms", ms); if !res.is_valid() { Ok(res) } else { @@ -139,7 +140,7 @@ where None } StreamItem::Stats(item) => { - info!("Stats: {:?}", item); + debug!("Stats: {:?}", item); None } StreamItem::DataItem(frame) => { @@ -154,10 +155,7 @@ where Streamlog::emit(&item); Some(Ok(StreamItem::Log(item))) } - item => { - info!("TEST GOT ITEM {:?}", item); - Some(Ok(item)) - } + item => Some(Ok(item)), }, Err(e) => { error!("TEST GOT ERROR FRAME: {:?}", e); @@ -208,7 +206,7 @@ where ready(g) }); let ret = s1.await; - info!("result: {:?}", ret); + debug!("result: {:?}", ret); Ok(ret) } @@ -275,7 +273,7 @@ async fn get_plain_events_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("get_plain_events get {}", url); + debug!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -288,10 +286,11 @@ async fn get_plain_events_json( } let buf = hyper::body::to_bytes(res.into_body()).await?; let s = String::from_utf8_lossy(&buf); - let res: JsonValue = serde_json::from_str(&s)?; - info!("GOT: {}", serde_json::to_string_pretty(&res)?); + let _res: JsonValue = serde_json::from_str(&s)?; + // TODO assert more let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("time {} ms", ms); + // TODO add timeout + debug!("time {} ms", ms); Ok(()) } diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index d0cb677..ffd4535 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -175,7 +175,7 @@ async fn get_json_common( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; query.append_to_url(&mut url); let url = url; - info!("get_json_common get {}", url); + debug!("get_json_common get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -189,11 +189,12 @@ async fn get_json_common( let res = hyper::body::to_bytes(res.into_body()).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_json_common DONE time {} ms", ms); + // TODO add timeout + debug!("get_json_common DONE time {} ms", ms); let res = String::from_utf8_lossy(&res).to_string(); - //info!("get_json_common res: {}", res); let res: serde_json::Value = serde_json::from_str(res.as_str())?; - info!( + // TODO assert these: + debug!( "result from endpoint: --------------\n{}\n--------------", serde_json::to_string_pretty(&res)? ); diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 3573ac9..712a3d5 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -2,7 +2,10 @@ name = "disk" version = "0.0.1-a.1" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" + +[lib] +path = "src/disk.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index b754bb7..7135e9a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -82,7 +82,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { let perf_opts = PerfOpts { inmem_bufcap: 512 }; let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { - info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range); + debug!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( "BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}", @@ -109,7 +109,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { Ok(Box::pin(s) as Pin> + Send>>) } Ok(None) => { - info!( + debug!( "BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}", range ); @@ -326,7 +326,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { let perf_opts = PerfOpts { inmem_bufcap: 512 }; let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) { Ok(Some(pre_range)) => { - info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range); + debug!("BinnedJsonChannelExec found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( "BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}", @@ -354,7 +354,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { Ok(Box::pin(s) as Pin> + Send>>) } Ok(None) => { - info!( + debug!( "BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}", range ); diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 4f99060..7b2adb0 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -342,7 +342,7 @@ impl FromUrl for BinnedQuery { .parse() .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, }; - info!("BinnedQuery::from_url {:?}", ret); + debug!("BinnedQuery::from_url {:?}", ret); Ok(ret) } } diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index a092da8..9990666 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -355,7 +355,7 @@ where Ok(item) => match item { StreamItem::Log(item) => { if do_log { - info!("collect_plain_events_json log {:?}", item); + debug!("collect_plain_events_json log {:?}", item); } } StreamItem::Stats(item) => match item { @@ -396,7 +396,7 @@ where } } let ret = serde_json::to_value(collector.result()?)?; - info!("Total duration: {:?}", total_duration); + debug!("Total duration: {:?}", total_duration); Ok(ret) } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index ec46883..976ff24 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -247,7 +247,7 @@ async fn open_files_inner( } } let h = OpenedFileSet { timebin: tb, files: a }; - info!( + debug!( "----- open_files_inner giving OpenedFileSet with {} files", h.files.len() ); @@ -351,13 +351,13 @@ async fn open_expanded_files_inner( for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { let w = position_file(&path, range, true, false).await?; if w.found { - info!("----- open_expanded_files_inner w.found for {:?}", path); + debug!("----- open_expanded_files_inner w.found for {:?}", path); a.push(w.file); found_pre = true; } } let h = OpenedFileSet { timebin: tb, files: a }; - info!( + debug!( "----- open_expanded_files_inner giving OpenedFileSet with {} files", h.files.len() ); @@ -387,7 +387,8 @@ async fn open_expanded_files_inner( p1 += 1; } } else { - info!("Could not find some event before the requested range, fall back to standard file list."); + // TODO emit statsfor this or log somewhere? + debug!("Could not find some event before the requested range, fall back to standard file list."); // Try to locate files according to non-expand-algorithm. open_files_inner(chtx, range, &channel_config, node).await?; } @@ -423,7 +424,7 @@ fn expanded_file_list() { while let Some(file) = files.next().await { match file { Ok(k) => { - info!("opened file: {:?}", k); + debug!("opened file: {:?}", k); paths.push(k.files); } Err(e) => { diff --git a/disk/src/lib.rs b/disk/src/disk.rs similarity index 98% rename from disk/src/lib.rs rename to disk/src/disk.rs index b750c1c..9569711 100644 --- a/disk/src/lib.rs +++ b/disk/src/disk.rs @@ -300,10 +300,10 @@ impl NeedMinBuffer { } } -// TODO remove this again +// TODO collect somewhere else impl Drop for NeedMinBuffer { fn drop(&mut self) { - info!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); + debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); } } @@ -355,7 +355,8 @@ impl Stream for NeedMinBuffer { Ready(Some(Err(e.into()))) } Ready(None) => { - info!("NeedMinBuffer histo: {:?}", self.buf_len_histo); + // TODO collect somewhere + debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo); Ready(None) } Pending => Pending, diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 239c6c8..3ebd019 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -114,7 +114,7 @@ impl Stream for EventChunkerMultifile { let file = ofs.files.pop().unwrap(); let path = file.path; let msg = format!("handle OFS {:?}", ofs); - info!("{}", msg); + debug!("{}", msg); let item = LogItem::quick(Level::INFO, msg); match file.file { Some(file) => { @@ -141,12 +141,12 @@ impl Stream for EventChunkerMultifile { Ready(Some(Ok(StreamItem::Log(item)))) } else if ofs.files.len() == 0 { let msg = format!("handle OFS {:?} NO FILES", ofs); - info!("{}", msg); + debug!("{}", msg); let item = LogItem::quick(Level::INFO, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { let msg = format!("handle OFS MERGED {:?}", ofs); - warn!("{}", msg); + debug!("{}", msg); let item = LogItem::quick(Level::INFO, msg); let mut chunkers = vec![]; for of in ofs.files { @@ -255,7 +255,8 @@ mod test { Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { - info!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::>()); + // TODO assert more + debug!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::>()); event_count += item.tss.len(); for ts in item.tss { tss.push(ts); @@ -280,7 +281,8 @@ mod test { end: DAY + MS * 100, }; let res = read_expanded_for_range(range, 0)?; - info!("got {:?}", res.1); + // TODO assert more + debug!("got {:?}", res.1); if res.0 != 3 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 9abdf9f..fd4a36f 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -46,10 +46,10 @@ pub struct EventChunker { unordered_warn_count: usize, } -// TODO remove again, use it explicitly impl Drop for EventChunker { fn drop(&mut self) { - info!( + // TODO collect somewhere + debug!( "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", self.decomp_dt_histo, self.item_len_emit_histo ); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 5fed55d..ab6a789 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -39,10 +39,10 @@ pub struct MergedStream { stats_items: VecDeque, } -// TODO get rid, log info explicitly. impl Drop for MergedStream { fn drop(&mut self) { - info!( + // TODO collect somewhere + debug!( "MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}", self.batch_len_emit_histo ); @@ -204,7 +204,7 @@ where for ii in 0..batch.len() { aa.push(batch.ts(ii)); } - info!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa); + debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa); }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) } else { @@ -265,7 +265,7 @@ where for ii in 0..batch.len() { aa.push(batch.ts(ii)); } - info!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa); + debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa); }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) } else { @@ -374,9 +374,10 @@ mod test { let mut merged = MergedStream::new(inps); let mut cevs = CollectedEvents { tss: vec![] }; let mut i1 = 0; + // TODO assert more while let Some(item) = merged.next().await { if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { - info!("item: {:?}", item); + debug!("item: {:?}", item); for ts in item.tss { cevs.tss.push(ts); } @@ -386,8 +387,8 @@ mod test { break; } } - info!("read {} data items", i1); - info!("cevs: {:?}", cevs); + debug!("read {} data items", i1); + debug!("cevs: {:?}", cevs); Ok(cevs) } diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index a2eca59..cd13f61 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -32,7 +32,7 @@ where Sitemty<::Output>: FrameType, { pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { - info!("MergedFromRemotes evq {:?}", evq); + debug!("MergedFromRemotes evq {:?}", evq); let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { let f = x_processed_stream_from_node::(evq.clone(), perf_opts.clone(), node.clone()); diff --git a/disk/src/raw/client.rs b/disk/src/raw/client.rs index dd69d1c..8d6d665 100644 --- a/disk/src/raw/client.rs +++ b/disk/src/raw/client.rs @@ -28,7 +28,7 @@ where ::Output: Unpin + 'static, Result::Output>>, err::Error>: FrameType, { - netpod::log::info!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw); + netpod::log::debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw); let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); @@ -48,7 +48,7 @@ pub async fn x_processed_event_blobs_stream_from_node( perf_opts: PerfOpts, node: Node, ) -> Result> + Send>>, Error> { - netpod::log::info!( + netpod::log::debug!( "x_processed_event_blobs_stream_from_node to: {}:{}", node.host, node.port_raw diff --git a/fsio/src/fsio.rs b/fsio/src/fsio.rs index a5bc2b8..6331c73 100644 --- a/fsio/src/fsio.rs +++ b/fsio/src/fsio.rs @@ -167,10 +167,11 @@ async fn write_1() -> Result<(), Error> { } #[cfg(test)] +#[allow(unused)] mod test { use super::*; - #[test] + //#[test] fn t1() -> Result<(), Error> { Ok(taskrun::run(write_1()).unwrap()) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index a35026c..794ff86 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -10,9 +10,7 @@ use hyper::{Body, Client, Request, Response}; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; use netpod::query::RawEventsQuery; -use netpod::{ - log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, APP_OCTET, -}; +use netpod::{log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use parse::channelconfig::{ extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry, @@ -575,7 +573,7 @@ impl DataApiPython3DataStream { if !*header_out { let head = Api1ChannelHeader { name: channel.name.clone(), - ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(), + ty: b.scalar_types[i1].to_api3proto().into(), byte_order: if b.be[i1] { "BIG_ENDIAN".into() } else { @@ -688,7 +686,7 @@ impl Stream for DataApiPython3DataStream { let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { - info!("Set up central storage stream"); + debug!("Set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let s = disk::raw::conn::make_local_event_blobs_stream( @@ -703,7 +701,7 @@ impl Stream for DataApiPython3DataStream { )?; Box::pin(s) as Pin> + Send>> } else { - info!("Set up merged remote stream"); + debug!("Set up merged remote stream"); let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( evq, perf_opts, @@ -769,7 +767,7 @@ impl Stream for DataApiPython3DataStream { } pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers()); + debug!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers()); let accept_def = ""; let accept = req .headers() @@ -784,12 +782,12 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); return Err(Error::with_msg_no_trace("can not parse query")); }; - info!("got Api1Query: {:?}", qu); + debug!("got Api1Query: {:?}", qu); let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); let beg_date = beg_date?; let end_date = end_date?; - info!("beg_date {:?} end_date {:?}", beg_date, end_date); + debug!("beg_date {:?} end_date {:?}", beg_date, end_date); //let url = Url::parse(&format!("dummy:{}", req.uri()))?; //let query = PlainEventsBinaryQuery::from_url(&url)?; // TODO add stricter check for types, check with client. @@ -832,22 +830,6 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach return Ok(ret); } -fn scalar_type_to_api3proto(sty: &ScalarType) -> &'static str { - match sty { - ScalarType::U8 => "uint8", - ScalarType::U16 => "uint16", - ScalarType::U32 => "uint32", - ScalarType::U64 => "uint64", - ScalarType::I8 => "int8", - ScalarType::I16 => "int16", - ScalarType::I32 => "int32", - ScalarType::I64 => "int64", - ScalarType::F32 => "float32", - ScalarType::F64 => "float64", - ScalarType::BOOL => "bool", - } -} - fn shape_to_api3proto(sh: &Option>) -> Vec { match sh { None => vec![], diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 27354cd..6ea8d2f 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -50,7 +50,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({ move |conn: &AddrStream| { - info!("new connection from {:?}", conn.remote_addr()); + // TODO send to logstash + debug!("new connection from {:?}", conn.remote_addr()); let node_config = node_config.clone(); async move { Ok::<_, Error>(service_fn({ @@ -167,7 +168,8 @@ macro_rules! static_http_api1 { } async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("http_service_try {:?}", req.uri()); + // TODO send to logstash + debug!("http_service_try {:?}", req.uri()); let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/node_status" { @@ -299,7 +301,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else if let Some(h) = channelarchiver::BlockStream::should_handle(path) { h.handle(req, &node_config).await } else if path.starts_with("/api/1/requestStatus/") { - info!("{}", path); Ok(response(StatusCode::OK).body(Body::from("{}"))?) } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { @@ -424,7 +425,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result binned_binary(query, node_config).await, @@ -473,7 +474,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result ); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { - info!("prebinned STARTING"); + debug!("prebinned STARTING"); }); let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); let ret = match fut.await { @@ -491,7 +492,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events headers: {:?}", req.headers()); + debug!("httpret plain_events headers: {:?}", req.headers()); let accept_def = ""; let accept = req .headers() @@ -522,7 +523,7 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res } async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_binary req: {:?}", req); + debug!("httpret plain_events_binary req: {:?}", req); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let query = PlainEventsBinaryQuery::from_url(&url)?; let op = disk::channelexec::PlainEvents::new( @@ -538,7 +539,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) } async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_json req: {:?}", req); + debug!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; let op = disk::channelexec::PlainEventsJson::new( diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 63bda4c..3573051 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -179,7 +179,7 @@ where fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { // TODO remove output if range.delta() > SEC * 0 { - netpod::log::info!( + netpod::log::debug!( "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}", range ); diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 5abd881..251d5a2 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -19,6 +19,7 @@ arrayref = "0.3.6" byteorder = "1.4.3" futures-core = "0.3.14" futures-util = "0.3.14" +md-5 = "0.9.1" err = { path = "../err" } netpod = { path = "../netpod" } taskrun = { path = "../taskrun" } diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs index 2d5eea5..1eca692 100644 --- a/netfetch/src/netbuf.rs +++ b/netfetch/src/netbuf.rs @@ -114,4 +114,30 @@ impl NetBuf { Ok(()) } } + + pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { + type T = u8; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(); + if self.wrcap() < TS { + return Err(Error::with_msg_no_trace("not enough space")); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u64(&mut self, v: u64) -> Result<(), Error> { + type T = u64; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(); + if self.wrcap() < TS { + return Err(Error::with_msg_no_trace("not enough space")); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } } diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 1954349..dd8f62b 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -1,12 +1,14 @@ +use err::Error; use futures_util::StreamExt; use netpod::log::*; use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached}; use std::collections::BTreeMap; use std::iter::FromIterator; +use std::time::Duration; #[test] fn ca_connect_1() { - taskrun::run(async { + let fut = async { let it = vec![(String::new(), String::new())].into_iter(); let pairs = BTreeMap::from_iter(it); let node_config = NodeConfigCached { @@ -42,9 +44,13 @@ fn ca_connect_1() { }; let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?; while let Some(item) = rx.next().await { - info!("got next: {:?}", item); + debug!("got next: {:?}", item); } - Ok(()) - }) - .unwrap(); + Ok::<_, Error>(()) + }; + let fut = async move { + let ret = tokio::time::timeout(Duration::from_millis(4000), fut).await??; + Ok(ret) + }; + taskrun::run(fut).unwrap(); } diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index c019d48..c3e9c69 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -7,10 +7,13 @@ use crate::netbuf::NetBuf; use crate::netbuf::RP_REW_PT; use async_channel::Receiver; use async_channel::Sender; +#[allow(unused)] +use bytes::BufMut; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use netpod::log::*; +use netpod::timeunits::SEC; use serde_json::Value as JsVal; use std::fmt; use std::mem; @@ -20,7 +23,8 @@ use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; -#[test] +//#[test] +#[allow(unused)] fn test_listen() -> Result<(), Error> { use std::time::Duration; let fut = async move { @@ -30,7 +34,8 @@ fn test_listen() -> Result<(), Error> { taskrun::run(fut) } -#[test] +//#[test] +#[allow(unused)] fn test_service() -> Result<(), Error> { //use std::time::Duration; let fut = async move { @@ -39,7 +44,7 @@ fn test_service() -> Result<(), Error> { info!("accepting..."); let (conn, remote) = sock.accept().await?; info!("new connection from {:?}", remote); - let mut zmtp = Zmtp::new(conn); + let mut zmtp = Zmtp::new(conn, SocketType::PUSH); let fut = async move { while let Some(item) = zmtp.next().await { info!("item from {:?} {:?}", remote, item); @@ -61,7 +66,7 @@ pub async fn zmtp_00() -> Result<(), Error> { pub async fn zmtp_client(addr: &str) -> Result<(), Error> { let conn = tokio::net::TcpStream::connect(addr).await?; - let mut zmtp = Zmtp::new(conn); + let mut zmtp = Zmtp::new(conn, SocketType::PULL); let mut i1 = 0; while let Some(item) = zmtp.next().await { match item { @@ -117,11 +122,87 @@ impl ConnState { } } -struct DummyData {} +pub enum SocketType { + PUSH, + PULL, +} + +struct DummyData { + ts: u64, + pulse: u64, + value: i64, +} + +impl DummyData { + fn make_zmtp_msg(&self) -> Result { + let head_b = HeadB { + htype: "bsr_d-1.1".into(), + channels: vec![ChannelDesc { + name: "TESTCHAN".into(), + ty: "int64".into(), + shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]), + encoding: "little".into(), + }], + }; + let hb = serde_json::to_vec(&head_b).unwrap(); + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(&hb); + let mut md5hex = String::with_capacity(32); + for c in h.finalize() { + use fmt::Write; + write!(&mut md5hex, "{:02x}", c).unwrap(); + } + let head_a = HeadA { + htype: "bsr_m-1.1".into(), + hash: md5hex, + pulse_id: serde_json::Number::from(self.pulse), + global_timestamp: GlobalTimestamp { + sec: self.ts / SEC, + ns: self.ts % SEC, + }, + }; + // TODO write directly to output buffer. + let ha = serde_json::to_vec(&head_a).unwrap(); + let hf = self.value.to_le_bytes().to_vec(); + let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat(); + let mut msg = ZmtpMessage { frames: vec![] }; + let fr = ZmtpFrame { + msglen: 0, + has_more: false, + is_command: false, + data: ha, + }; + msg.frames.push(fr); + let fr = ZmtpFrame { + msglen: 0, + has_more: false, + is_command: false, + data: hb, + }; + msg.frames.push(fr); + let fr = ZmtpFrame { + msglen: 0, + has_more: false, + is_command: false, + data: hf, + }; + msg.frames.push(fr); + let fr = ZmtpFrame { + msglen: 0, + has_more: false, + is_command: false, + data: hp, + }; + msg.frames.push(fr); + Ok(msg) + } +} struct Zmtp { done: bool, complete: bool, + socket_type: SocketType, conn: TcpStream, conn_state: ConnState, buf: NetBuf, @@ -137,7 +218,7 @@ struct Zmtp { } impl Zmtp { - fn new(conn: TcpStream) -> Self { + fn new(conn: TcpStream, socket_type: SocketType) -> Self { //conn.set_send_buffer_size(1024 * 64)?; //conn.set_recv_buffer_size(1024 * 1024 * 4)?; //info!("send_buffer_size {:8}", conn.send_buffer_size()?); @@ -146,6 +227,7 @@ impl Zmtp { Self { done: false, complete: false, + socket_type, conn, conn_state: ConnState::InitSend, buf: NetBuf::new(), @@ -192,17 +274,33 @@ impl Zmtp { } ConnState::InitRecv2 => { info!("parse_item InitRecv2"); + let msgrem = self.conn_state.need_min(); + let ver_min = self.buf.read_u8()?; + let msgrem = msgrem - 1; + info!("Peer minor version {}", ver_min); // TODO parse greeting remainder.. sec-scheme. - self.buf.adv(self.conn_state.need_min())?; - self.outbuf - .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?; + self.buf.adv(msgrem)?; + match self.socket_type { + SocketType::PUSH => { + self.outbuf + .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?; + } + SocketType::PULL => { + self.outbuf + .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?; + } + } self.out_enable = true; self.conn_state = ConnState::ReadFrameFlags; let tx = self.data_tx.clone(); let fut1 = async move { loop { tokio::time::sleep(Duration::from_millis(1000)).await; - let dd = DummyData {}; + let dd = DummyData { + ts: 420032002200887766, + pulse: 123123123123, + value: -777, + }; match tx.send(dd).await { Ok(()) => { info!("item send to channel"); @@ -304,6 +402,20 @@ impl ZmtpMessage { pub fn frames(&self) -> &Vec { &self.frames } + + pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> { + let n = self.frames.len(); + for (i, fr) in self.frames.iter().enumerate() { + let mut flags: u8 = 2; + if i < n - 1 { + flags |= 1; + } + out.put_u8(flags)?; + out.put_u64(fr.data().len() as u64)?; + out.put_slice(fr.data())?; + } + Ok(()) + } } pub struct ZmtpFrame { @@ -349,11 +461,23 @@ enum Int { } impl Int { - fn is_item(&self) -> bool { + fn item_count(&self) -> u32 { if let Int::Item(_) = self { - true + 1 } else { - false + 0 + } + } +} + +impl fmt::Debug for Int { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::NoWork => write!(f, "NoWork"), + Self::Pending => write!(f, "Pending"), + Self::Empty => write!(f, "Empty"), + Self::Item(_) => write!(f, "Item"), + Self::Done => write!(f, "Done"), } } } @@ -375,39 +499,33 @@ impl Stream for Zmtp { return Ready(None); } loop { - let have_item = false; - let serialized: Int<()> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT { + let mut item_count = 0; + let serialized: Int> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT { match self.data_rx.poll_next_unpin(cx) { - Ready(Some(k)) => { - info!("item from channel, put to output buffer"); - let head_a = HeadA { - htype: "bsr_m-1.1".into(), - // TODO hash definition? - hash: "b9e0916effc5a8a2f1977a9eb8beea63".into(), - pulse_id: serde_json::Number::from(42424242), - global_timestamp: GlobalTimestamp { - sec: 1636401670, - ns: 12920856, - }, - }; - let head_b = HeadB { - htype: "bsr_d-1.1".into(), - channels: vec![ChannelDesc { - name: "TESTCHAN".into(), - ty: "int64".into(), - shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]), - encoding: "little".into(), - }], - }; - let ha = serde_json::to_vec(&head_a).unwrap(); - let hb = serde_json::to_vec(&head_b).unwrap(); - let hf = 23478236u64.to_le_bytes(); - let hp = 13131313u64.to_le_bytes(); - self.outbuf.put_slice(&ha).unwrap(); - self.outbuf.put_slice(&hb).unwrap(); - self.outbuf.put_slice(&hf).unwrap(); - self.outbuf.put_slice(&hp).unwrap(); - Int::Empty + Ready(Some(item)) => { + let msg = item.make_zmtp_msg().unwrap(); + match msg.emit_to_buffer(&mut self.outbuf) { + Ok(_) => Int::Empty, + Err(e) => { + self.done = true; + Int::Item(Err(e)) + } + } + /*let mut msgs = Vec::with_capacity(1024 * 8); + msgs.put_u8(1 | 2); + msgs.put_u64(ha.len() as u64); + msgs.put_slice(&ha); + msgs.put_u8(1 | 2); + msgs.put_u64(hb.len() as u64); + msgs.put_slice(&hb); + msgs.put_u8(1 | 2); + msgs.put_u64(hf.len() as u64); + msgs.put_slice(&hf); + msgs.put_u8(2); + msgs.put_u64(hp.len() as u64); + msgs.put_slice(&hp); + self.outbuf.put_slice(&msgs).unwrap(); + Int::Empty*/ } Ready(None) => Int::Done, Pending => Int::Pending, @@ -415,8 +533,10 @@ impl Stream for Zmtp { } else { Int::NoWork }; - let have_item = have_item | serialized.is_item(); - let write = if self.outbuf.len() > 0 { + item_count += serialized.item_count(); + let write: Int> = if item_count > 0 { + Int::NoWork + } else if self.outbuf.len() > 0 { let (b, w) = self.outbuf_conn(); pin_mut!(w); match w.poll_write(cx, b) { @@ -427,17 +547,24 @@ impl Stream for Zmtp { self.outbuf.rewind_if_needed(); Int::Empty } - Err(e) => Int::Item(Err::<(), _>(e)), + Err(e) => { + error!("advance error {:?}", e); + Int::Item(Err(e)) + } }, - Err(e) => Int::Item(Err(e.into())), + Err(e) => { + error!("output write error {:?}", e); + Int::Item(Err(e.into())) + } }, Pending => Int::Pending, } } else { Int::NoWork }; - let have_item = have_item | write.is_item(); - let read: Int> = if have_item || self.inp_eof { + info!("write result: {:?} {}", write, self.outbuf.len()); + item_count += write.item_count(); + let read: Int> = if item_count > 0 || self.inp_eof { Int::NoWork } else { if self.buf.cap() < self.conn_state.need_min() { @@ -480,8 +607,8 @@ impl Stream for Zmtp { Int::NoWork } }; - let have_item = have_item | read.is_item(); - let parsed = if have_item || self.buf.len() < self.conn_state.need_min() { + item_count += read.item_count(); + let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() { Int::NoWork } else { match self.parse_item() { @@ -492,43 +619,63 @@ impl Stream for Zmtp { Err(e) => Int::Item(Err(e)), } }; - let _have_item = have_item | parsed.is_item(); + item_count += parsed.item_count(); + let _ = item_count; { use Int::*; - match (write, read, parsed) { - (Item(_), Item(_), _) => panic!(), - (Item(_), _, Item(_)) => panic!(), - (_, Item(_), Item(_)) => panic!(), - (NoWork | Done, NoWork | Done, NoWork | Done) => { + match (serialized, write, read, parsed) { + (NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => { warn!("all NoWork or Done"); break Poll::Pending; } - (_, Item(Err(e)), _) => { + (Item(Err(e)), _, _, _) => { self.done = true; break Poll::Ready(Some(Err(e))); } - (_, _, Item(Err(e))) => { + (_, Item(Err(e)), _, _) => { self.done = true; break Poll::Ready(Some(Err(e))); } - (Item(_), _, _) => { + (_, _, Item(Err(e)), _) => { + self.done = true; + break Poll::Ready(Some(Err(e))); + } + (_, _, _, Item(Err(e))) => { + self.done = true; + break Poll::Ready(Some(Err(e))); + } + (Item(_), _, _, _) => { continue; } - (_, Item(Ok(_)), _) => { + (_, Item(_), _, _) => { continue; } - (_, _, Item(Ok(item))) => { + (_, _, Item(_), _) => { + continue; + } + (_, _, _, Item(Ok(item))) => { break Poll::Ready(Some(Ok(item))); } - (Empty, _, _) => continue, - (_, Empty, _) => continue, - (_, _, Empty) => continue, + (Empty, _, _, _) => continue, + (_, Empty, _, _) => continue, + (_, _, Empty, _) => continue, + (_, _, _, Empty) => continue, #[allow(unreachable_patterns)] - (Pending, Pending | NoWork | Done, Pending | NoWork | Done) => break Poll::Pending, + (Pending, Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done) => { + break Poll::Pending + } #[allow(unreachable_patterns)] - (Pending | NoWork | Done, Pending, Pending | NoWork | Done) => break Poll::Pending, + (Pending | NoWork | Done, Pending, Pending | NoWork | Done, Pending | NoWork | Done) => { + break Poll::Pending + } #[allow(unreachable_patterns)] - (Pending | NoWork | Done, Pending | NoWork | Done, Pending) => break Poll::Pending, + (Pending | NoWork | Done, Pending | NoWork | Done, Pending, Pending | NoWork | Done) => { + break Poll::Pending + } + #[allow(unreachable_patterns)] + (Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done, Pending) => { + break Poll::Pending + } } }; } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index cfa69a4..d76a893 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -134,6 +134,22 @@ impl ScalarType { BOOL => 0, } } + + pub fn to_api3proto(&self) -> &'static str { + match self { + ScalarType::U8 => "uint8", + ScalarType::U16 => "uint16", + ScalarType::U32 => "uint32", + ScalarType::U64 => "uint64", + ScalarType::I8 => "int8", + ScalarType::I16 => "int16", + ScalarType::I32 => "int32", + ScalarType::I64 => "int64", + ScalarType::F32 => "float32", + ScalarType::F64 => "float64", + ScalarType::BOOL => "bool", + } + } } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 184f246..d910e03 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -115,7 +115,7 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("json parse error"), netout))?; } }; - info!("--- nodenet::conn got query -------------------\nevq {:?}", evq); + debug!("--- nodenet::conn got query -------------------\nevq {:?}", evq); let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.channel_archiver { @@ -142,7 +142,6 @@ async fn events_conn_handler_inner_try( }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { - //info!("conn.rs encode frame typeid {:x}", item.typeid()); let item = item.make_frame(); match item { Ok(buf) => { @@ -166,6 +165,6 @@ async fn events_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } - info!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo); + debug!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo); Ok(()) }