diff --git a/Cargo.toml b/Cargo.toml index d26f92f..b74118a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer", "httpret", "h5out", "items", "items_2", "items_proc", "nodenet", "httpclient", "dq"] +members = ["daqbuffer", "httpret", "h5out", "items_proc", "nodenet", "httpclient", "dq"] [profile.release] opt-level = 1 diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 79119d5..4d35d9e 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -31,7 +31,6 @@ httpclient = { path = "../httpclient" } disk = { path = "../disk" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } -items = { path = "../items" } streams = { path = "../streams" } [dev-dependencies] diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index f1c3031..47ea0fe 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -28,4 +28,3 @@ err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } taskrun = { path = "../taskrun" } -items = { path = "../items" } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 9e3fe4b..2be3b3c 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -40,7 +40,6 @@ query = { path = "../query" } bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } -items = { path = "../items" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index cc5f612..88059ad 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -5,6 +5,7 @@ use netpod::ChannelConfig; use netpod::NodeConfigCached; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; +use parse::channelconfig::ChannelConfigs; use parse::channelconfig::MatchingConfigEntry; pub async fn config( @@ -12,14 +13,24 @@ pub async fn config( channel: Channel, node_config: &NodeConfigCached, ) -> Result { - let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?; - let entry_res = match extract_matching_config_entry(&range, &channel_config) { + let channel_configs = read_local_config(channel.clone(), node_config.node.clone()).await?; + let entry_res = match extract_matching_config_entry(&range, &channel_configs) { Ok(k) => k, Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, + MatchingConfigEntry::None => { + return Err(Error::with_public_msg(format!( + "disk::channelconfig no config entry found for {:?}", + channel + )))? + } + MatchingConfigEntry::Multiple => { + return Err(Error::with_public_msg(format!( + "disk::channelconfig multiple config entries in range found for {:?}", + channel + )))? + } MatchingConfigEntry::Entry(entry) => entry, }; let shape = match entry.to_shape() { @@ -30,7 +41,7 @@ pub async fn config( channel: channel.clone(), keyspace: entry.ks as u8, time_bin_size: entry.bs.clone(), - shape: shape, + shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), array: entry.is_array, @@ -38,3 +49,7 @@ pub async fn config( }; Ok(channel_config) } + +pub async fn configs(channel: Channel, node_config: &NodeConfigCached) -> Result { + read_local_config(channel.clone(), node_config.node.clone()).await +} diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 23776dd..f618f49 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -8,6 +8,7 @@ use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::WithLen; use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; @@ -93,9 +94,7 @@ impl Stream for EventChunkerMultifile { type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - //tracing::field::DebugValue; let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix); - //span1.record("node_ix", &self.node_ix); let _spg = span1.enter(); use Poll::*; 'outer: loop { @@ -117,7 +116,7 @@ impl Stream for EventChunkerMultifile { Ready(Some(Ok(k))) => { let k = if let StreamItem::DataItem(RangeCompletableItem::Data(h)) = k { let mut h: EventFull = h; - if h.tss.len() > 0 { + if h.len() > 0 { let min = h.tss.iter().fold(u64::MAX, |a, &x| a.min(x)); let max = h.tss.iter().fold(u64::MIN, |a, &x| a.max(x)); if min <= self.max_ts { @@ -131,7 +130,7 @@ impl Stream for EventChunkerMultifile { "EventChunkerMultifile emit {}/{} events {}", self.emit_count, after, - h.tss.len() + h.len() ); self.emit_count += 1; } @@ -224,9 +223,6 @@ impl Stream for EventChunkerMultifile { self.expand, self.do_decompress, ); - let chunker = chunker - //.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) - ; chunkers.push(Box::pin(chunker) as _); } } @@ -270,6 +266,7 @@ mod test { use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; + use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; @@ -328,7 +325,7 @@ mod test { RangeCompletableItem::Data(item) => { // TODO assert more debug!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::>()); - event_count += item.tss.len(); + event_count += item.len(); for ts in item.tss { tss.push(ts); } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3c0e073..64325a4 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -64,11 +64,13 @@ pub async fn make_event_pipe( } } let range = evq.range().clone(); - let channel_config = match read_local_config(evq.channel().clone(), node_config.node.clone()).await { - Ok(k) => k, + let channel_config = + crate::channelconfig::config(evq.range().try_into()?, evq.channel().clone(), node_config).await; + let channel_config = match channel_config { + Ok(x) => x, Err(e) => { - // TODO introduce detailed error type if e.msg().contains("ErrorKind::NotFound") { + warn!("{e}"); let s = futures_util::stream::empty(); return Ok(Box::pin(s)); } else { @@ -76,29 +78,6 @@ pub async fn make_event_pipe( } } }; - let entry_res = match extract_matching_config_entry(&(&range).try_into()?, &channel_config) { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, - MatchingConfigEntry::Entry(entry) => entry, - }; - let shape = match entry.to_shape() { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let channel_config = netpod::ChannelConfig { - channel: evq.channel().clone(), - keyspace: entry.ks as u8, - time_bin_size: entry.bs.clone(), - shape, - scalar_type: entry.scalar_type.clone(), - byte_order: entry.byte_order.clone(), - array: entry.is_array, - compression: entry.is_compressed, - }; trace!( "make_event_pipe need_expand {need_expand} {evq:?}", need_expand = evq.one_before_range() @@ -122,14 +101,10 @@ pub async fn make_event_pipe( true, out_max_len, ); - let shape = entry.to_shape()?; + let scalar_type = channel_config.scalar_type.clone(); + let shape = channel_config.shape.clone(); error!("TODO replace AggKind in the called code"); - let pipe = make_num_pipeline_stream_evs( - entry.scalar_type.clone(), - shape.clone(), - AggKind::TimeWeightedScalar, - event_blobs, - ); + let pipe = make_num_pipeline_stream_evs(scalar_type, shape.clone(), AggKind::TimeWeightedScalar, event_blobs); Ok(pipe) } @@ -138,14 +113,24 @@ pub async fn get_applicable_entry( channel: Channel, node_config: &NodeConfigCached, ) -> Result { - let channel_config = read_local_config(channel, node_config.node.clone()).await?; + let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, + MatchingConfigEntry::None => { + return Err(Error::with_public_msg(format!( + "get_applicable_entry no config entry found {:?}", + channel + )))? + } + MatchingConfigEntry::Multiple => { + return Err(Error::with_public_msg(format!( + "get_applicable_entry multiple config entries found for {:?}", + channel + )))? + } MatchingConfigEntry::Entry(entry) => entry, }; Ok(entry.clone()) @@ -259,7 +244,21 @@ pub async fn make_event_blobs_pipe( } let expand = evq.one_before_range(); let range = evq.range(); - let entry = get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await?; + let entry = match get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await { + Ok(x) => x, + Err(e) => { + if e.to_public_error().msg().contains("no config entry found") { + let item = items_0::streamitem::LogItem { + node_ix: node_config.ix as _, + level: Level::WARN, + msg: format!("{} {}", node_config.node.host, e), + }; + return Ok(Box::pin(stream::iter([Ok(StreamItem::Log(item))]))); + } else { + return Err(e); + } + } + }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); type ItemType = Sitemty; // TODO should depend on host config diff --git a/dq/Cargo.toml b/dq/Cargo.toml index 4b2b279..09594bb 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -16,7 +16,6 @@ bytes = "1.0.1" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } -items = { path = "../items" } parse = { path = "../parse" } disk = { path = "../disk" } streams = { path = "../streams" } diff --git a/err/src/lib.rs b/err/src/lib.rs index 6347289..a49588a 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -112,7 +112,7 @@ impl Error { reason: self.reason(), msg: self .public_msg() - .map(|k| k.join("\n")) + .map(|k| k.join("; ")) .unwrap_or("No error message".into()), } } @@ -167,8 +167,8 @@ impl fmt::Debug for Error { }; write!(fmt, "msg: {}", self.msg)?; if let Some(msgs) = self.public_msg() { - for msg in msgs { - write!(fmt, "\npublic: {}", msg)?; + for (i, msg) in msgs.iter().enumerate() { + write!(fmt, "; pub({i}): {msg}")?; } } if !trace_str.is_empty() { diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 208543c..8fa4f2e 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -29,7 +29,6 @@ netpod = { path = "../netpod" } query = { path = "../query" } dbconn = { path = "../dbconn" } disk = { path = "../disk" } -items = { path = "../items" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 3429a74..043241e 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -8,6 +8,7 @@ use bytes::BufMut; use bytes::BytesMut; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use disk::raw::conn::make_local_event_blobs_stream; +use futures_util::stream; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -22,6 +23,7 @@ use hyper::Response; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::WithLen; use items_2::eventfull::EventFull; use itertools::Itertools; use netpod::log::*; @@ -43,7 +45,7 @@ use netpod::APP_JSON; use netpod::APP_OCTET; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; -use parse::channelconfig::Config; +use parse::channelconfig::ChannelConfigs; use parse::channelconfig::ConfigEntry; use parse::channelconfig::MatchingConfigEntry; use query::api4::events::PlainEventsQuery; @@ -641,7 +643,7 @@ pub struct DataApiPython3DataStream { node_config: NodeConfigCached, chan_ix: usize, chan_stream: Option> + Send>>>, - config_fut: Option> + Send>>>, + config_fut: Option> + Send>>>, disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] @@ -689,7 +691,7 @@ impl DataApiPython3DataStream { count_events: &mut usize, ) -> Result { let mut d = BytesMut::new(); - for i1 in 0..b.tss.len() { + for i1 in 0..b.len() { const EVIMAX: usize = 6; if *count_events < EVIMAX { debug!( @@ -811,85 +813,94 @@ impl Stream for DataApiPython3DataStream { Ok(k) => k, Err(e) => return Err(e)?, }; - let entry = match entry_res { + match entry_res { MatchingConfigEntry::None => { - return Err(Error::with_public_msg("no config entry found"))? + warn!("DataApiPython3DataStream no config entry found for {:?}", config); + self.chan_stream = Some(Box::pin(stream::empty())); + continue; } MatchingConfigEntry::Multiple => { - return Err(Error::with_public_msg("multiple config entries found"))? + warn!( + "DataApiPython3DataStream multiple config entries found for {:?}", + config + ); + self.chan_stream = Some(Box::pin(stream::empty())); + continue; } - MatchingConfigEntry::Entry(entry) => entry.clone(), - }; - let channel = self.channels[self.chan_ix - 1].clone(); - debug!("found channel_config for {}: {:?}", channel.name, entry); - let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs(); - info!("query for event blobs retrieval: evq {evq:?}"); - warn!("fix magic inmem_bufcap"); - let perf_opts = PerfOpts::default(); - // TODO is this a good to place decide this? - let s = if self.node_config.node_config.cluster.is_central_storage { - info!("Set up central storage stream"); - // TODO pull up this config - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s = make_local_event_blobs_stream( - evq.range().try_into()?, - evq.channel().clone(), - &entry, - evq.one_before_range(), - self.do_decompress, - event_chunker_conf, - self.disk_io_tune.clone(), - &self.node_config, - )?; - Box::pin(s) as Pin> + Send>> - } else { - if let Some(sh) = &entry.shape { - if sh.len() > 1 { - warn!("Remote stream fetch for shape {sh:?}"); - } - } - debug!("Set up merged remote stream"); - let s = MergedBlobsFromRemotes::new( - evq, - perf_opts, - self.node_config.node_config.cluster.clone(), - ); - Box::pin(s) as Pin> + Send>> - }; - let s = s.map({ - let mut header_out = false; - let mut count_events = 0; - let channel = self.channels[self.chan_ix - 1].clone(); - move |b| { - let ret = match b { - Ok(b) => { - let f = match b { - StreamItem::DataItem(RangeCompletableItem::Data(b)) => { - Self::convert_item( - b, - &channel, - &entry, - &mut header_out, - &mut count_events, - )? - } - _ => BytesMut::new(), - }; - Ok(f) + MatchingConfigEntry::Entry(entry) => { + let entry = entry.clone(); + let channel = self.channels[self.chan_ix - 1].clone(); + debug!("found channel_config for {}: {:?}", channel.name, entry); + let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs(); + info!("query for event blobs retrieval: evq {evq:?}"); + warn!("fix magic inmem_bufcap"); + let perf_opts = PerfOpts::default(); + // TODO is this a good to place decide this? + let s = if self.node_config.node_config.cluster.is_central_storage { + info!("Set up central storage stream"); + // TODO pull up this config + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let s = make_local_event_blobs_stream( + evq.range().try_into()?, + evq.channel().clone(), + &entry, + evq.one_before_range(), + self.do_decompress, + event_chunker_conf, + self.disk_io_tune.clone(), + &self.node_config, + )?; + Box::pin(s) as Pin> + Send>> + } else { + if let Some(sh) = &entry.shape { + if sh.len() > 1 { + warn!("Remote stream fetch for shape {sh:?}"); + } } - Err(e) => Err(e), + debug!("Set up merged remote stream"); + let s = MergedBlobsFromRemotes::new( + evq, + perf_opts, + self.node_config.node_config.cluster.clone(), + ); + Box::pin(s) as Pin> + Send>> }; - ret + let s = s.map({ + let mut header_out = false; + let mut count_events = 0; + let channel = self.channels[self.chan_ix - 1].clone(); + move |b| { + let ret = match b { + Ok(b) => { + let f = match b { + StreamItem::DataItem(RangeCompletableItem::Data(b)) => { + Self::convert_item( + b, + &channel, + &entry, + &mut header_out, + &mut count_events, + )? + } + _ => BytesMut::new(), + }; + Ok(f) + } + Err(e) => Err(e), + }; + ret + } + }); + //let _ = Box::new(s) as Box> + Unpin>; + let evm = if self.events_max == 0 { + usize::MAX + } else { + self.events_max as usize + }; + self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); + continue; } - }); - //let _ = Box::new(s) as Box> + Unpin>; - let evm = if self.events_max == 0 { - usize::MAX - } else { - self.events_max as usize - }; - self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); - continue; + } } Ready(Err(e)) => { self.config_fut = None; diff --git a/httpret/src/api4/search.rs b/httpret/src/api4/search.rs index ece579d..44516c7 100644 --- a/httpret/src/api4/search.rs +++ b/httpret/src/api4/search.rs @@ -1,12 +1,17 @@ -use crate::bodystream::{response, ToPublicResponse}; +use crate::bodystream::response; +use crate::bodystream::ToPublicResponse; use crate::Error; +use http::Method; +use http::Request; +use http::Response; use http::StatusCode; -use http::{Method, Request, Response}; use hyper::Body; use netpod::log::*; +use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; -use netpod::{ChannelSearchQuery, NodeConfigCached}; -use netpod::{ACCEPT_ALL, APP_JSON}; +use netpod::NodeConfigCached; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; use url::Url; pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result { diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9f62c09..b675360 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -127,6 +127,65 @@ impl ChannelConfigHandler { } } +pub struct ChannelConfigsHandler {} + +impl ChannelConfigsHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/channel/configs" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + match self.channel_configs(req, &node_config).await { + Ok(k) => Ok(k), + Err(e) => { + warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); + Ok(e.to_public_response()) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn channel_configs( + &self, + req: Request, + node_config: &NodeConfigCached, + ) -> Result, Error> { + info!("channel_configs"); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ChannelConfigQuery::from_url(&url)?; + info!("channel_configs for q {q:?}"); + let conf = if let Some(_) = &node_config.node_config.cluster.scylla { + return Err(Error::with_msg_no_trace("TODO")); + } else if let Some(_) = &node_config.node.channel_archiver { + return Err(Error::with_msg_no_trace("TODO")); + } else if let Some(_) = &node_config.node.archiver_appliance { + return Err(Error::with_msg_no_trace("TODO")); + } else { + disk::channelconfig::configs(q.channel, node_config).await? + }; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&conf)?))?; + Ok(ret) + } +} + trait ErrConv { fn err_conv(self) -> Result; } diff --git a/httpret/src/err.rs b/httpret/src/err.rs index 6b5b16f..b964492 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -1,24 +1,25 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::fmt; #[derive(Serialize, Deserialize)] -pub struct Error(pub ::err::Error); +pub struct Error(pub err::Error); impl Error { pub fn with_msg>(s: S) -> Self { - Self(::err::Error::with_msg(s)) + Self(err::Error::with_msg(s)) } pub fn with_msg_no_trace>(s: S) -> Self { - Self(::err::Error::with_msg_no_trace(s)) + Self(err::Error::with_msg_no_trace(s)) } pub fn with_public_msg>(s: S) -> Self { - Self(::err::Error::with_public_msg(s)) + Self(err::Error::with_public_msg(s)) } pub fn with_public_msg_no_trace>(s: S) -> Self { - Self(::err::Error::with_public_msg_no_trace(s)) + Self(err::Error::with_public_msg_no_trace(s)) } pub fn msg(&self) -> &str { @@ -52,13 +53,13 @@ impl fmt::Display for Error { impl std::error::Error for Error {} -impl From<::err::Error> for Error { - fn from(x: ::err::Error) -> Self { +impl From for Error { + fn from(x: err::Error) -> Self { Self(x) } } -impl From for ::err::Error { +impl From for err::Error { fn from(x: Error) -> Self { x.0 } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index af848b3..9a01ac8 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -30,6 +30,7 @@ use hyper::Body; use hyper::Request; use hyper::Response; use net::SocketAddr; +use netpod::is_false; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::NodeConfigCached; @@ -297,6 +298,8 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { @@ -607,9 +610,9 @@ pub struct StatusBoardEntry { ts_created: SystemTime, #[serde(serialize_with = "instant_serde::ser")] ts_updated: SystemTime, - #[serde(skip_serializing_if = "items_2::bool_is_false")] + #[serde(skip_serializing_if = "is_false")] is_error: bool, - #[serde(skip_serializing_if = "items_2::bool_is_false")] + #[serde(skip_serializing_if = "is_false")] is_ok: bool, #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, diff --git a/items/Cargo.toml b/items/Cargo.toml deleted file mode 100644 index e471660..0000000 --- a/items/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "items" -version = "0.1.0" -authors = ["Dominik Werder "] -edition = "2021" - -[dependencies] -tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -futures-util = "0.3.15" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -ciborium = "0.2" -bson = "2.4.0" -erased-serde = "0.3" -bytes = "1.2.1" -num-traits = "0.2.15" -chrono = { version = "0.4.22", features = ["serde"] } -crc32fast = "1.3.2" -err = { path = "../err" } -items_proc = { path = "../items_proc" } -items_0 = { path = "../items_0" } -netpod = { path = "../netpod" } diff --git a/items/src/lib.rs b/items/src/lib.rs deleted file mode 100644 index 5168a83..0000000 --- a/items/src/lib.rs +++ /dev/null @@ -1,107 +0,0 @@ -use err::Error; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::StreamItem; -use netpod::range::evrange::NanoRange; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use tokio::fs::File; -use tokio::io::AsyncRead; -use tokio::io::ReadBuf; - -pub trait WithTimestamps { - fn ts(&self, ix: usize) -> u64; -} - -pub trait ByteEstimate { - fn byte_estimate(&self) -> u64; -} - -pub trait RangeOverlapInfo { - // TODO do not take by value. - fn ends_before(&self, range: NanoRange) -> bool; - fn ends_after(&self, range: NanoRange) -> bool; - fn starts_after(&self, range: NanoRange) -> bool; -} - -pub trait EventAppendable -where - Self: Sized, -{ - type Value; - fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self; -} - -// TODO should get I/O and tokio dependence out of this crate -trait ReadableFromFile: Sized { - fn read_from_file(file: File) -> Result, Error>; - // TODO should not need this: - fn from_buf(buf: &[u8]) -> Result; -} - -// TODO should get I/O and tokio dependence out of this crate -struct ReadPbv -where - T: ReadableFromFile, -{ - buf: Vec, - all: Vec, - file: Option, - _m1: PhantomData, -} - -impl ReadPbv -where - T: ReadableFromFile, -{ - fn new(file: File) -> Self { - Self { - // TODO make buffer size a parameter: - buf: vec![0; 1024 * 32], - all: Vec::new(), - file: Some(file), - _m1: PhantomData, - } - } -} - -impl Future for ReadPbv -where - T: ReadableFromFile + Unpin, -{ - type Output = Result>, Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - let mut buf = std::mem::replace(&mut self.buf, Vec::new()); - let ret = 'outer: loop { - let mut dst = ReadBuf::new(&mut buf); - if dst.remaining() == 0 || dst.capacity() == 0 { - break Ready(Err(Error::with_msg("bad read buffer"))); - } - let fp = self.file.as_mut().unwrap(); - let f = Pin::new(fp); - break match File::poll_read(f, cx, &mut dst) { - Ready(res) => match res { - Ok(_) => { - if dst.filled().len() > 0 { - self.all.extend_from_slice(dst.filled()); - continue 'outer; - } else { - match T::from_buf(&mut self.all) { - Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), - Err(e) => Ready(Err(e)), - } - } - } - Err(e) => Ready(Err(e.into())), - }, - Pending => Pending, - }; - }; - self.buf = buf; - ret - } -} diff --git a/items_0/src/container.rs b/items_0/src/container.rs new file mode 100644 index 0000000..e26c1f7 --- /dev/null +++ b/items_0/src/container.rs @@ -0,0 +1,11 @@ +use crate::Events; + +pub trait ByteEstimate { + fn byte_estimate(&self) -> u64; +} + +impl ByteEstimate for Box { + fn byte_estimate(&self) -> u64 { + self.as_ref().byte_estimate() + } +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index b0e50e1..f07f3ad 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -1,10 +1,10 @@ pub mod collect_s; +pub mod container; pub mod framable; pub mod isodate; pub mod scalar_ops; pub mod streamitem; pub mod subfr; -pub mod transform; pub mod bincode { pub use bincode::*; @@ -12,6 +12,7 @@ pub mod bincode { use collect_s::Collectable; use collect_s::ToJsonResult; +use container::ByteEstimate; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; use std::any::Any; @@ -135,7 +136,16 @@ impl From for err::Error { /// Container of some form of events, for use as trait object. pub trait Events: - fmt::Debug + TypeName + Any + Collectable + TimeBinnable + WithLen + Send + erased_serde::Serialize + EventsNonObj + fmt::Debug + + TypeName + + Any + + Collectable + + TimeBinnable + + WithLen + + ByteEstimate + + Send + + erased_serde::Serialize + + EventsNonObj { fn as_time_binnable(&self) -> &dyn TimeBinnable; fn verify(&self) -> bool; @@ -183,22 +193,22 @@ pub struct TransformProperties { pub needs_value: bool, } -pub trait Transformer { +pub trait EventTransform { fn query_transform_properties(&self) -> TransformProperties; } -impl Transformer for Box +impl EventTransform for Box where - T: Transformer, + T: EventTransform, { fn query_transform_properties(&self) -> TransformProperties { self.as_ref().query_transform_properties() } } -impl Transformer for std::pin::Pin> +impl EventTransform for std::pin::Pin> where - T: Transformer, + T: EventTransform, { fn query_transform_properties(&self) -> TransformProperties { self.as_ref().query_transform_properties() diff --git a/items_0/src/scalar_ops.rs b/items_0/src/scalar_ops.rs index bfdc8eb..c071f0e 100644 --- a/items_0/src/scalar_ops.rs +++ b/items_0/src/scalar_ops.rs @@ -60,7 +60,7 @@ impl AsPrimF32 for String { } pub trait ScalarOps: - fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static + fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static { fn zero_b() -> Self; fn equal_slack(&self, rhs: &Self) -> bool; diff --git a/items_0/src/streamitem.rs b/items_0/src/streamitem.rs index ade3362..ff79691 100644 --- a/items_0/src/streamitem.rs +++ b/items_0/src/streamitem.rs @@ -142,38 +142,4 @@ mod levelserde { } } -pub trait ContainsError { - fn is_err(&self) -> bool; - fn err(&self) -> Option<&::err::Error>; -} - -impl ContainsError for Box -where - T: ContainsError, -{ - fn is_err(&self) -> bool { - self.as_ref().is_err() - } - - fn err(&self) -> Option<&::err::Error> { - self.as_ref().err() - } -} - -impl ContainsError for Sitemty { - fn is_err(&self) -> bool { - match self { - Ok(_) => false, - Err(_) => true, - } - } - - fn err(&self) -> Option<&::err::Error> { - match self { - Ok(_) => None, - Err(e) => Some(e), - } - } -} - erased_serde::serialize_trait_object!(TimeBinned); diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs deleted file mode 100644 index e69de29..0000000 diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index bc4ca4a..9823f3b 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -22,7 +22,6 @@ tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } humantime-serde = "1.1.1" rmp-serde = "1.1.1" err = { path = "../err" } -items = { path = "../items" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items_2/src/binnedcollected.rs b/items_2/src/binnedcollected.rs index 41bf1fe..5621158 100644 --- a/items_2/src/binnedcollected.rs +++ b/items_2/src/binnedcollected.rs @@ -11,9 +11,9 @@ use items_0::collect_s::ToJsonResult; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::EventTransform; use items_0::TimeBinnable; use items_0::TimeBinner; -use items_0::Transformer; use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -69,7 +69,7 @@ fn _old_binned_collected( scalar_type: ScalarType, shape: Shape, binrange: BinnedRangeEnum, - transformer: &dyn Transformer, + transformer: &dyn EventTransform, deadline: Instant, inp: Pin> + Send>>, ) -> Result { diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 12e3649..0a2d385 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -18,10 +18,12 @@ use items_0::TimeBinned; use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; +use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; +use netpod::CmpZero; use netpod::Dim0Kind; use serde::Deserialize; use serde::Serialize; @@ -303,11 +305,11 @@ pub struct BinsDim0CollectedResult { maxs: VecDeque, #[serde(rename = "avgs")] avgs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, - #[serde(rename = "missingBins", default, skip_serializing_if = "crate::is_zero_u32")] + #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] missing_bins: u32, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index b59bb16..063d017 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -21,13 +21,14 @@ use items_0::TimeBinned; use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; +use netpod::is_false; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; use netpod::Dim0Kind; -use num_traits::Zero; +use netpod::CmpZero; use serde::Deserialize; use serde::Serialize; use std::any; @@ -269,11 +270,11 @@ pub struct BinsXbinDim0CollectedResult { maxs: VecDeque, #[serde(rename = "avgs")] avgs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, - #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")] + #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] missing_bins: u32, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 555ad5d..ae6d7bc 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,10 +1,10 @@ use crate::framable::FrameType; -use crate::merger; use crate::merger::Mergeable; use crate::Events; use items_0::collect_s::Collectable; use items_0::collect_s::Collected; use items_0::collect_s::Collector; +use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::AsAnyMut; @@ -12,9 +12,7 @@ use items_0::AsAnyRef; use items_0::MergeError; use items_0::WithLen; use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use serde::Deserialize; use serde::Serialize; @@ -55,6 +53,13 @@ impl ConnStatusEvent { } } +impl ByteEstimate for ConnStatusEvent { + fn byte_estimate(&self) -> u64 { + // TODO magic number, but maybe good enough + 32 + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ChannelStatus { Connect, @@ -86,6 +91,13 @@ impl ChannelStatusEvent { } } +impl ByteEstimate for ChannelStatusEvent { + fn byte_estimate(&self) -> u64 { + // TODO magic number, but maybe good enough + 32 + } +} + /// Events on a channel consist not only of e.g. timestamped values, but can be also /// connection status changes. #[derive(Debug)] @@ -490,6 +502,18 @@ impl WithLen for ChannelEvents { } } +impl ByteEstimate for ChannelEvents { + fn byte_estimate(&self) -> u64 { + match self { + ChannelEvents::Events(k) => k.byte_estimate(), + ChannelEvents::Status(k) => match k { + Some(k) => k.byte_estimate(), + None => 0, + }, + } + } +} + impl Mergeable for ChannelEvents { fn ts_min(&self) -> Option { match self { diff --git a/items_2/src/eventfull.rs b/items_2/src/eventfull.rs index 5f02042..4e89ee8 100644 --- a/items_2/src/eventfull.rs +++ b/items_2/src/eventfull.rs @@ -1,8 +1,7 @@ use crate::framable::FrameType; use crate::merger::Mergeable; use bytes::BytesMut; -use items::ByteEstimate; -use items::WithTimestamps; +use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; use items_0::Empty; @@ -29,6 +28,7 @@ pub struct EventFull { pub be: VecDeque, pub shapes: VecDeque, pub comps: VecDeque>, + pub entry_payload_max: u64, } #[allow(unused)] @@ -81,6 +81,9 @@ impl EventFull { shape: Shape, comp: Option, ) { + let m1 = blob.as_ref().map_or(0, |x| x.len()); + let m2 = decomp.as_ref().map_or(0, |x| x.len()); + self.entry_payload_max = self.entry_payload_max.max(m1 as u64 + m2 as u64); self.tss.push_back(ts); self.pulses.push_back(pulse); self.blobs.push_back(blob); @@ -91,6 +94,7 @@ impl EventFull { self.comps.push_back(comp); } + // TODO possible to get rid of this? pub fn truncate_ts(&mut self, end: u64) { let mut nkeep = usize::MAX; for (i, &ts) in self.tss.iter().enumerate() { @@ -131,6 +135,7 @@ impl Empty for EventFull { be: VecDeque::new(), shapes: VecDeque::new(), comps: VecDeque::new(), + entry_payload_max: 0, } } } @@ -141,22 +146,9 @@ impl WithLen for EventFull { } } -impl WithTimestamps for EventFull { - fn ts(&self, ix: usize) -> u64 { - self.tss[ix] - } -} - impl ByteEstimate for EventFull { fn byte_estimate(&self) -> u64 { - if self.len() == 0 { - 0 - } else { - // TODO that is clumsy... it assumes homogenous types. - // TODO improve via a const fn on NTY - let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len()); - self.tss.len() as u64 * (40 + self.blobs[0].as_ref().map_or(0, |x| x.len()) as u64 + decomp_len as u64) - } + self.len() as u64 * (64 + self.entry_payload_max) } } @@ -176,6 +168,13 @@ impl Mergeable for EventFull { fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; + let mut max = dst.entry_payload_max; + for i in r.clone() { + let m1 = self.blobs[i].as_ref().map_or(0, |x| x.len()); + let m2 = self.decomps[i].as_ref().map_or(0, |x| x.len()); + max = max.max(m1 as u64 + m2 as u64); + } + dst.entry_payload_max = max; dst.tss.extend(self.tss.drain(r.clone())); dst.pulses.extend(self.pulses.drain(r.clone())); dst.blobs.extend(self.blobs.drain(r.clone())); diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index fd7358c..1b6c8ba 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -10,6 +10,7 @@ use items_0::collect_s::Collector; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; +use items_0::container::ByteEstimate; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::AsAnyMut; @@ -21,6 +22,7 @@ use items_0::MergeError; use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; +use netpod::is_false; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; @@ -125,6 +127,13 @@ impl WithLen for EventsDim0 { } } +impl ByteEstimate for EventsDim0 { + fn byte_estimate(&self) -> u64 { + let stylen = mem::size_of::(); + (self.len() * (8 + 8 + stylen)) as u64 + } +} + impl RangeOverlapInfo for EventsDim0 { fn ends_before(&self, range: &SeriesRange) -> bool { if range.is_time() { @@ -238,9 +247,9 @@ pub struct EventsDim0CollectorOutput { pulse_off: VecDeque, #[serde(rename = "values")] values: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, @@ -1122,7 +1131,7 @@ mod test_frame { use items_0::streamitem::StreamItem; #[test] - fn events_bincode() { + fn events_serialize() { taskrun::tracing_init().unwrap(); let mut events = EventsDim0::empty(); events.push(123, 234, 55f32); diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index f58cabf..50aeedc 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -10,6 +10,7 @@ use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; +use items_0::container::ByteEstimate; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::AsAnyMut; @@ -21,6 +22,7 @@ use items_0::MergeError; use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; +use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; @@ -31,6 +33,7 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use std::mem; #[allow(unused)] macro_rules! trace2 { @@ -130,6 +133,14 @@ impl WithLen for EventsDim1 { } } +impl ByteEstimate for EventsDim1 { + fn byte_estimate(&self) -> u64 { + let stylen = mem::size_of::(); + let n = self.values.front().map_or(0, Vec::len); + (self.len() * (8 + 8 + n * stylen)) as u64 + } +} + impl RangeOverlapInfo for EventsDim1 { fn ends_before(&self, range: &SeriesRange) -> bool { todo!() @@ -199,9 +210,9 @@ pub struct EventsDim1CollectorOutput { pulse_off: VecDeque, #[serde(rename = "values")] values: VecDeque>, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index c979a16..d5bc1af 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -1,4 +1,5 @@ use crate::binsxbindim0::BinsXbinDim0; +use items_0::container::ByteEstimate; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -14,6 +15,7 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::WithLen; +use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; @@ -69,7 +71,7 @@ where } } -impl items::ByteEstimate for EventsXbinDim0 { +impl ByteEstimate for EventsXbinDim0 { fn byte_estimate(&self) -> u64 { todo!("byte_estimate") } @@ -365,9 +367,9 @@ pub struct EventsXbinDim0CollectorOutput { maxs: VecDeque, #[serde(rename = "avgs")] avgs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, diff --git a/items_2/src/framable.rs b/items_2/src/framable.rs index 4cdaf6e..4721aab 100644 --- a/items_2/src/framable.rs +++ b/items_2/src/framable.rs @@ -90,7 +90,7 @@ where } pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned { - fn from_error(e: ::err::Error) -> Self; + fn from_error(e: err::Error) -> Self; fn from_log(item: LogItem) -> Self; fn from_stats(item: StatsItem) -> Self; fn from_range_complete() -> Self; @@ -148,3 +148,34 @@ where } } } + +#[test] +fn test_frame_log() { + use crate::channelevents::ChannelEvents; + use crate::frame::decode_from_slice; + use netpod::log::Level; + let item = LogItem { + node_ix: 123, + level: Level::TRACE, + msg: format!("test-log-message"), + }; + let item: Sitemty = Ok(StreamItem::Log(item)); + let buf = Framable::make_frame(&item).unwrap(); + let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); + let item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); +} + +#[test] +fn test_frame_error() { + use crate::channelevents::ChannelEvents; + use crate::frame::decode_from_slice; + let item: Sitemty = Err(Error::with_msg_no_trace(format!("dummy-error-message"))); + let buf = Framable::make_frame(&item).unwrap(); + let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); + let tyid = u32::from_le_bytes(buf[8..12].try_into().unwrap()); + if tyid != ERROR_FRAME_TYPE_ID { + panic!("bad tyid"); + } + eprintln!("buf len {} len {}", buf.len(), len); + let item2: Error = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); +} diff --git a/items_2/src/frame.rs b/items_2/src/frame.rs index 519cde5..5ebe4e3 100644 --- a/items_2/src/frame.rs +++ b/items_2/src/frame.rs @@ -1,5 +1,4 @@ use crate::framable::FrameDecodable; -use crate::framable::FrameType; use crate::framable::INMEM_FRAME_ENCID; use crate::framable::INMEM_FRAME_HEAD; use crate::framable::INMEM_FRAME_MAGIC; @@ -15,7 +14,6 @@ use bytes::BufMut; use bytes::BytesMut; use err::Error; use items_0::bincode; -use items_0::streamitem::ContainsError; use items_0::streamitem::LogItem; use items_0::streamitem::StatsItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; @@ -23,9 +21,10 @@ use items_0::streamitem::LOG_FRAME_TYPE_ID; use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID; use items_0::streamitem::STATS_FRAME_TYPE_ID; use items_0::streamitem::TERM_FRAME_TYPE_ID; -#[allow(unused)] use netpod::log::*; use serde::Serialize; +use std::any; +use std::io; trait EC { fn ec(self) -> err::Error; @@ -43,17 +42,6 @@ impl EC for rmp_serde::decode::Error { } } -pub fn make_frame(item: &FT) -> Result -where - FT: FrameType + ContainsError + Serialize, -{ - if item.is_err() { - make_error_frame(item.err().unwrap()) - } else { - make_frame_2(item, item.frame_type_id()) - } -} - pub fn bincode_ser( w: W, ) -> bincode::Serializer< @@ -64,7 +52,7 @@ pub fn bincode_ser( >, > where - W: std::io::Write, + W: io::Write, { use bincode::Options; let opts = DefaultOptions::new() @@ -98,73 +86,83 @@ where ::deserialize(&mut de).map_err(|e| format!("{e}").into()) } -pub fn encode_to_vec(item: S) -> Result, Error> +pub fn msgpack_to_vec(item: T) -> Result, Error> where - S: Serialize, + T: Serialize, { - if false { - serde_json::to_vec(&item).map_err(|e| e.into()) - } else { - bincode_to_vec(&item) - } + rmp_serde::to_vec_named(&item).map_err(|e| format!("{e}").into()) +} + +pub fn msgpack_erased_to_vec(item: T) -> Result, Error> +where + T: erased_serde::Serialize, +{ + let mut out = Vec::new(); + let mut ser1 = rmp_serde::Serializer::new(&mut out).with_struct_map(); + let mut ser2 = ::erase(&mut ser1); + item.erased_serialize(&mut ser2) + .map_err(|e| Error::from(format!("{e}")))?; + Ok(out) +} + +pub fn msgpack_from_slice(buf: &[u8]) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + rmp_serde::from_slice(buf).map_err(|e| format!("{e}").into()) +} + +pub fn encode_to_vec(item: T) -> Result, Error> +where + T: Serialize, +{ + msgpack_to_vec(item) +} + +pub fn encode_erased_to_vec(item: T) -> Result, Error> +where + T: erased_serde::Serialize, +{ + msgpack_erased_to_vec(item) } pub fn decode_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - if false { - serde_json::from_slice(buf).map_err(|e| e.into()) - } else { - bincode_from_slice(buf) - } + msgpack_from_slice(buf) } -pub fn make_frame_2(item: &T, fty: u32) -> Result +pub fn make_frame_2(item: T, fty: u32) -> Result where T: erased_serde::Serialize, { - let mut out = Vec::new(); - //let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map(); - //let writer = ciborium::ser::into_writer(&item, &mut out).unwrap(); - let mut ser = bincode_ser(&mut out); - let mut ser2 = ::erase(&mut ser); - //let mut ser = serde_json::Serializer::new(&mut out); - //let mut ser2 = ::erase(&mut ser); - match item.erased_serialize(&mut ser2) { - Ok(_) => { - let enc = out; - if enc.len() > u32::MAX as usize { - return Err(Error::with_msg(format!("too long payload {}", enc.len()))); - } - let mut h = crc32fast::Hasher::new(); - h.update(&enc); - let payload_crc = h.finalize(); - // TODO reserve also for footer via constant - let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); - buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(INMEM_FRAME_ENCID); - buf.put_u32_le(fty); - buf.put_u32_le(enc.len() as u32); - buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. - //trace!("enc len {}", enc.len()); - //trace!("payload_crc {}", payload_crc); - buf.put(enc.as_ref()); - let mut h = crc32fast::Hasher::new(); - h.update(&buf); - let frame_crc = h.finalize(); - buf.put_u32_le(frame_crc); - //trace!("frame_crc {}", frame_crc); - Ok(buf) - } - Err(e) => Err(e)?, + let enc = encode_erased_to_vec(item)?; + if enc.len() > u32::MAX as usize { + return Err(Error::with_msg(format!("too long payload {}", enc.len()))); } + let mut h = crc32fast::Hasher::new(); + h.update(&enc); + let payload_crc = h.finalize(); + // TODO reserve also for footer via constant + let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); + buf.put_u32_le(INMEM_FRAME_MAGIC); + buf.put_u32_le(INMEM_FRAME_ENCID); + buf.put_u32_le(fty); + buf.put_u32_le(enc.len() as u32); + buf.put_u32_le(payload_crc); + // TODO add padding to align to 8 bytes. + buf.put(enc.as_ref()); + let mut h = crc32fast::Hasher::new(); + h.update(&buf); + let frame_crc = h.finalize(); + buf.put_u32_le(frame_crc); + return Ok(buf); } // TODO remove duplication for these similar `make_*_frame` functions: -pub fn make_error_frame(error: &::err::Error) -> Result { +pub fn make_error_frame(error: &err::Error) -> Result { match encode_to_vec(error) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); @@ -176,24 +174,18 @@ pub fn make_error_frame(error: &::err::Error) -> Result { buf.put_u32_le(ERROR_FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. - //trace!("enc len {}", enc.len()); - //trace!("payload_crc {}", payload_crc); buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); let frame_crc = h.finalize(); buf.put_u32_le(frame_crc); - //trace!("frame_crc {}", frame_crc); Ok(buf) } Err(e) => Err(e)?, } } -// TODO can I remove this usage? pub fn make_log_frame(item: &LogItem) -> Result { - warn!("make_log_frame {item:?}"); match encode_to_vec(item) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); @@ -203,10 +195,8 @@ pub fn make_log_frame(item: &LogItem) -> Result { buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(LOG_FRAME_TYPE_ID); - warn!("make_log_frame payload len {}", enc.len()); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); @@ -230,7 +220,6 @@ pub fn make_stats_frame(item: &StatsItem) -> Result { buf.put_u32_le(STATS_FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); @@ -253,7 +242,6 @@ pub fn make_range_complete_frame() -> Result { buf.put_u32_le(RANGE_COMPLETE_FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); @@ -273,7 +261,6 @@ pub fn make_term_frame() -> Result { buf.put_u32_le(TERM_FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); - // TODO add padding to align to 8 bytes. buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); @@ -298,11 +285,15 @@ where ))); } if frame.tyid() == ERROR_FRAME_TYPE_ID { - let k: ::err::Error = match decode_from_slice(frame.buf()) { + let k: err::Error = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len()); - let n = frame.buf().len().min(128); + error!( + "ERROR deserialize len {} ERROR_FRAME_TYPE_ID {}", + frame.buf().len(), + e + ); + let n = frame.buf().len().min(256); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); Err(e)? @@ -313,7 +304,7 @@ where let k: LogItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len()); + error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID {}", frame.buf().len(), e); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -325,7 +316,11 @@ where let k: StatsItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len()); + error!( + "ERROR deserialize len {} STATS_FRAME_TYPE_ID {}", + frame.buf().len(), + e + ); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -349,7 +344,7 @@ where match decode_from_slice(frame.buf()) { Ok(item) => Ok(item), Err(e) => { - error!("decode_frame T = {}", std::any::type_name::()); + error!("decode_frame T = {}", any::type_name::()); error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid()); let n = frame.buf().len().min(64); let s = String::from_utf8_lossy(&frame.buf()[..n]); diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 5a9d475..3510ce7 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -29,7 +29,6 @@ use items_0::Events; use items_0::MergeError; use items_0::RangeOverlapInfo; use merger::Mergeable; -use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; use serde::Deserialize; @@ -38,14 +37,6 @@ use serde::Serializer; use std::collections::VecDeque; use std::fmt; -pub fn bool_is_false(x: &bool) -> bool { - *x == false -} - -pub fn is_zero_u32(x: &u32) -> bool { - *x == 0 -} - pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; let ts_anchor_ns = ts_anchor_sec * SEC; @@ -208,9 +199,9 @@ pub trait TimeBinnableTypeAggregator: Send { fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output; } -pub trait ChannelEventsInput: Stream> + items_0::Transformer + Send {} +pub trait ChannelEventsInput: Stream> + items_0::EventTransform + Send {} -impl ChannelEventsInput for T where T: Stream> + items_0::Transformer + Send {} +impl ChannelEventsInput for T where T: Stream> + items_0::EventTransform + Send {} pub fn runfut(fut: F) -> Result where diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index b5b92eb..f88df93 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -2,6 +2,7 @@ pub use crate::Error; use futures_util::Stream; use futures_util::StreamExt; +use items_0::container::ByteEstimate; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; @@ -16,6 +17,8 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +const OUT_MAX_BYTES: u64 = 1024 * 200; + #[allow(unused)] macro_rules! trace2 { (__$($arg:tt)*) => (); @@ -34,7 +37,7 @@ macro_rules! trace4 { ($($arg:tt)*) => (trace!($($arg)*)); } -pub trait Mergeable: fmt::Debug + WithLen + Unpin { +pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn new_empty(&self) -> Self; @@ -316,7 +319,7 @@ where if let Some(o) = self.out.as_ref() { // A good threshold varies according to scalar type and shape. // TODO replace this magic number by a bound on the bytes estimate. - if o.len() >= self.out_max_len || self.do_clear_out { + if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out { trace3!("decide to output"); self.do_clear_out = false; Break(Ready(Some(Ok(self.out.take().unwrap())))) @@ -409,7 +412,7 @@ where } } -impl items_0::Transformer for Merger { +impl items_0::EventTransform for Merger { fn query_transform_properties(&self) -> items_0::TransformProperties { todo!() } diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index efb1147..ee9f963 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -2,8 +2,8 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::EventTransform; use items_0::TransformProperties; -use items_0::Transformer; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; @@ -17,7 +17,7 @@ pub struct Enumerate2 { impl Enumerate2 { pub fn new(inp: T) -> Self where - T: Transformer, + T: EventTransform, { Self { inp, cnt: 0 } } @@ -43,7 +43,7 @@ where } } -impl Transformer for Enumerate2 { +impl EventTransform for Enumerate2 { fn query_transform_properties(&self) -> TransformProperties { todo!() } @@ -114,7 +114,7 @@ where } } -impl Transformer for Then2 { +impl EventTransform for Then2 { fn query_transform_properties(&self) -> TransformProperties { todo!() } @@ -123,11 +123,11 @@ impl Transformer for Then2 { pub trait TransformerExt { fn enumerate2(self) -> Enumerate2 where - Self: Transformer + Sized; + Self: EventTransform + Sized; fn then2(self, f: F) -> Then2 where - Self: Transformer + Stream + Sized, + Self: EventTransform + Stream + Sized, F: Fn(::Item) -> Fut, Fut: Future; } @@ -135,14 +135,14 @@ pub trait TransformerExt { impl TransformerExt for T { fn enumerate2(self) -> Enumerate2 where - Self: Transformer + Sized, + Self: EventTransform + Sized, { Enumerate2::new(self) } fn then2(self, f: F) -> Then2 where - Self: Transformer + Stream + Sized, + Self: EventTransform + Stream + Sized, F: Fn(::Item) -> Fut, Fut: Future, { @@ -178,7 +178,7 @@ where } } -impl Transformer for VecStream { +impl EventTransform for VecStream { fn query_transform_properties(&self) -> TransformProperties { todo!() } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 73128bd..57adfad 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -4,6 +4,10 @@ pub mod range; pub mod status; pub mod streamext; +pub mod log { + pub use tracing::{self, debug, error, event, info, span, trace, warn, Level}; +} + use crate::log::*; use bytes::Bytes; use chrono::DateTime; @@ -48,6 +52,16 @@ where *x.borrow() == false } +pub trait CmpZero { + fn is_zero(&self) -> bool; +} + +impl CmpZero for u32 { + fn is_zero(&self) -> bool { + *self == 0 + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, @@ -1055,9 +1069,28 @@ pub trait Dim0Index: Clone + fmt::Debug + PartialOrd { fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum; } -#[derive(Clone, Serialize, Deserialize, PartialEq, PartialOrd)] +#[derive(Clone, Deserialize, PartialEq, PartialOrd)] pub struct TsNano(pub u64); +mod ts_nano_ser { + use super::TsNano; + use crate::timeunits::SEC; + use chrono::TimeZone; + use chrono::Utc; + use serde::Serialize; + + impl Serialize for TsNano { + fn serialize(&self, ser: S) -> Result + where + S: serde::Serializer, + { + let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); + let value = format!("{}", ts.earliest().unwrap()); + ser.serialize_newtype_struct("TsNano", &value) + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] pub struct PulseId(u64); @@ -1074,7 +1107,9 @@ impl TsNano { impl fmt::Debug for TsNano { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); - f.debug_struct("TsNano").field("ns", &ts).finish() + f.debug_struct("TsNano") + .field("ts", &ts.earliest().unwrap_or(Default::default())) + .finish() } } @@ -1855,11 +1890,6 @@ where } } -pub mod log { - #[allow(unused_imports)] - pub use tracing::{self, debug, error, event, info, span, trace, warn, Level}; -} - #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct EventDataReadStats { pub parsed_bytes: u64, diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 574ece4..fd76b0d 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -4,9 +4,6 @@ version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" -[lib] -path = "src/nodenet.rs" - [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -29,7 +26,6 @@ netpod = { path = "../netpod" } query = { path = "../query" } disk = { path = "../disk" } #parse = { path = "../parse" } -items = { path = "../items" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } dbconn = { path = "../dbconn" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 4bec02f..ab93702 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -376,14 +376,9 @@ async fn events_conn_handler_inner( match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(ce) => { - error!("events_conn_handler_inner sees error {:?}", ce.err); - // Try to pass the error over the network. - // If that fails, give error to the caller. let mut out = ce.netout; - let e = ce.err; - let buf = make_error_frame(&e)?; - //type T = StreamItem>>; - //let buf = Err::(e).make_frame()?; + let item: Sitemty = Err(ce.err); + let buf = Framable::make_frame(&item)?; out.write_all(&buf).await?; } } diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index b34a292..70d0384 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -1,6 +1,6 @@ use crate::conn::events_conn_handler; use futures_util::StreamExt; -use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; @@ -9,11 +9,10 @@ use items_0::streamitem::LOG_FRAME_TYPE_ID; use items_0::streamitem::STATS_FRAME_TYPE_ID; use items_2::channelevents::ChannelEvents; use items_2::framable::EventQueryJsonStringFrame; +use items_2::framable::Framable; use items_2::frame::decode_frame; -use items_2::frame::make_frame; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; -use netpod::AggKind; use netpod::Channel; use netpod::Cluster; use netpod::Database; @@ -21,9 +20,9 @@ use netpod::FileIoBufferSize; use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; +use netpod::PerfOpts; use netpod::SfDatabuffer; use query::api4::events::PlainEventsQuery; -use std::time::Duration; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; @@ -83,15 +82,15 @@ fn raw_data_00() { }; let qu = PlainEventsQuery::new(channel, range); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query))); - let frame = make_frame(&item).unwrap(); + let frame = sitem_data(query).make_frame()?; let jh = taskrun::spawn(events_conn_handler(client, addr, cfg)); con.write_all(&frame).await.unwrap(); eprintln!("written"); con.shutdown().await.unwrap(); eprintln!("shut down"); - let mut frames = InMemoryFrameAsyncReadStream::new(con, 1024 * 128); + let perf_opts = PerfOpts::default(); + let mut frames = InMemoryFrameAsyncReadStream::new(con, perf_opts.inmem_bufcap); while let Some(frame) = frames.next().await { match frame { Ok(frame) => match frame { diff --git a/nodenet/src/nodenet.rs b/nodenet/src/lib.rs similarity index 100% rename from nodenet/src/nodenet.rs rename to nodenet/src/lib.rs diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index d523805..825c2af 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -80,7 +80,7 @@ impl CompressionMethod { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ConfigEntry { - pub ts: u64, + pub ts: TsNano, pub pulse: i64, pub ks: i32, pub bs: TsNano, @@ -130,8 +130,8 @@ impl ConfigEntry { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct Config { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelConfigs { pub format_version: i16, pub channel_name: String, pub entries: Vec, @@ -238,7 +238,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { Ok(( inp_e, Some(ConfigEntry { - ts: ts as u64, + ts: TsNano::from_ns(ts as u64), pulse, ks, bs, @@ -267,7 +267,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { /** Parse a complete configuration file from given in-memory input buffer. */ -pub fn parse_config(inp: &[u8]) -> NRes { +pub fn parse_config(inp: &[u8]) -> NRes { let (inp, ver) = be_i16(inp)?; let (inp, len1) = be_i32(inp)?; if len1 <= 8 || len1 > 500 { @@ -294,7 +294,7 @@ pub fn parse_config(inp: &[u8]) -> NRes { return mkerr(format!("channelName utf8 error {:?}", e)); } }; - let ret = Config { + let ret = ChannelConfigs { format_version: ver, channel_name: channel_name, entries: entries, @@ -320,7 +320,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result Result { +pub async fn read_local_config(channel: Channel, node: Node) -> Result { let path = node .sf_databuffer .as_ref() @@ -360,18 +360,18 @@ pub enum MatchingConfigEntry<'a> { pub fn extract_matching_config_entry<'a>( range: &NanoRange, - channel_config: &'a Config, + channel_config: &'a ChannelConfigs, ) -> Result, Error> { - let mut ixs = vec![]; + let mut ixs = Vec::new(); for i1 in 0..channel_config.entries.len() { let e1 = &channel_config.entries[i1]; if i1 + 1 < channel_config.entries.len() { let e2 = &channel_config.entries[i1 + 1]; - if e1.ts < range.end && e2.ts >= range.beg { + if e1.ts.ns() < range.end && e2.ts.ns() >= range.beg { ixs.push(i1); } } else { - if e1.ts < range.end { + if e1.ts.ns() < range.end { ixs.push(i1); } } @@ -397,7 +397,7 @@ mod test { let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config"; //let path = "../resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config"; let mut f1 = std::fs::File::open(path).unwrap(); - let mut buf = vec![]; + let mut buf = Vec::new(); f1.read_to_end(&mut buf).unwrap(); buf } @@ -415,8 +415,8 @@ mod test { assert_eq!(config.format_version, 0); assert_eq!(config.entries.len(), 18); for e in &config.entries { - assert!(e.ts >= 631152000000000000); - assert!(e.ts <= 1613640673424172164); + assert!(e.ts.ns() >= 631152000000000000); + assert!(e.ts.ns() <= 1613640673424172164); assert!(e.shape.is_some()); } } diff --git a/query/src/transform.rs b/query/src/transform.rs index 0323ceb..1e944ff 100644 --- a/query/src/transform.rs +++ b/query/src/transform.rs @@ -9,7 +9,7 @@ use std::collections::BTreeMap; use url::Url; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum EventTransform { +enum EventTransformQuery { EventBlobsVerbatim, EventBlobsUncompressed, ValueFull, @@ -20,7 +20,7 @@ pub enum EventTransform { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum TimeBinningTransform { +enum TimeBinningTransformQuery { None, TimeWeighted, Unweighted, @@ -28,8 +28,8 @@ pub enum TimeBinningTransform { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TransformQuery { - event: EventTransform, - time_binning: TimeBinningTransform, + event: EventTransformQuery, + time_binning: TimeBinningTransformQuery, } impl TransformQuery { @@ -39,15 +39,15 @@ impl TransformQuery { pub fn default_events() -> Self { Self { - event: EventTransform::ValueFull, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, } } pub fn default_time_binned() -> Self { Self { - event: EventTransform::MinMaxAvgDev, - time_binning: TimeBinningTransform::TimeWeighted, + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, } } @@ -61,28 +61,30 @@ impl TransformQuery { pub fn for_event_blobs() -> Self { Self { - event: EventTransform::EventBlobsVerbatim, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, } } pub fn for_time_weighted_scalar() -> Self { Self { - event: EventTransform::MinMaxAvgDev, - time_binning: TimeBinningTransform::TimeWeighted, + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, } } pub fn is_event_blobs(&self) -> bool { match &self.event { - EventTransform::EventBlobsVerbatim => true, - EventTransform::EventBlobsUncompressed => { + EventTransformQuery::EventBlobsVerbatim => true, + EventTransformQuery::EventBlobsUncompressed => { error!("TODO decide on uncompressed event blobs"); panic!() } _ => false, } } + + pub fn build_event_transform(&self) -> () {} } impl FromUrl for TransformQuery { @@ -97,35 +99,35 @@ impl FromUrl for TransformQuery { if let Some(s) = pairs.get(key) { let ret = if s == "eventBlobs" { TransformQuery { - event: EventTransform::EventBlobsVerbatim, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, } } else if s == "fullValue" { TransformQuery { - event: EventTransform::ValueFull, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::ValueFull, + time_binning: TimeBinningTransformQuery::None, } } else if s == "timeWeightedScalar" { TransformQuery { - event: EventTransform::MinMaxAvgDev, - time_binning: TimeBinningTransform::TimeWeighted, + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::TimeWeighted, } } else if s == "unweightedScalar" { TransformQuery { - event: EventTransform::EventBlobsVerbatim, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, } } else if s == "binnedX" { let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; warn!("TODO binnedXcount"); TransformQuery { - event: EventTransform::MinMaxAvgDev, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::MinMaxAvgDev, + time_binning: TimeBinningTransformQuery::None, } } else if s == "pulseIdDiff" { TransformQuery { - event: EventTransform::PulseIdDiff, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::PulseIdDiff, + time_binning: TimeBinningTransformQuery::None, } } else { return Err(Error::with_msg("can not extract binningScheme")); @@ -141,8 +143,8 @@ impl FromUrl for TransformQuery { }) .unwrap_or(None); let ret = TransformQuery { - event: EventTransform::EventBlobsVerbatim, - time_binning: TimeBinningTransform::None, + event: EventTransformQuery::EventBlobsVerbatim, + time_binning: TimeBinningTransformQuery::None, }; Ok(ret) } diff --git a/streams/Cargo.toml b/streams/Cargo.toml index f00bedd..52e27ab 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] } -tracing = "0.1.26" futures-util = "0.3.15" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -21,7 +20,6 @@ wasmer = { version = "3.1.1", default-features = false, features = ["sys", "cran err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } -items = { path = "../items" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index 38c2cf7..90019da 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -15,15 +15,20 @@ use std::task::Poll; pub struct EventsFromFrames { inp: Pin, Error>> + Send>>, + dbgdesc: String, errored: bool, completed: bool, _m1: PhantomData, } impl EventsFromFrames { - pub fn new(inp: Pin, Error>> + Send>>) -> Self { + pub fn new( + inp: Pin, Error>> + Send>>, + dbgdesc: String, + ) -> Self { Self { inp, + dbgdesc, errored: false, completed: false, _m1: PhantomData, @@ -39,7 +44,8 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = netpod::log::span!(netpod::log::Level::INFO, "EvFrFr"); + let span = span!(Level::INFO, "EvFrFr", id = tracing::field::Empty); + span.record("id", &self.dbgdesc); let _spg = span.enter(); loop { break if self.completed { @@ -50,12 +56,22 @@ where } else { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Log(item) => { + info!("{} {:?} {}", item.node_ix, item.level, item.msg); + Ready(Some(Ok(StreamItem::Log(item)))) + } StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(frame) => match decode_frame::>(&frame) { Ok(item) => match item { - Ok(item) => Ready(Some(Ok(item))), + Ok(item) => match item { + StreamItem::Log(k) => { + info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg); + Ready(Some(Ok(StreamItem::Log(k)))) + } + item => Ready(Some(Ok(item))), + }, Err(e) => { + error!("rcvd err: {}", e); self.errored = true; Ready(Some(Err(e))) } diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 10a67bf..d2ed06b 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -12,12 +12,10 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; use items_2::eventfull::EventFull; use items_2::framable::EventQueryJsonStringFrame; -use items_2::frame::make_frame; +use items_2::framable::Framable; use items_2::frame::make_term_frame; use netpod::log::*; use netpod::Cluster; @@ -36,25 +34,21 @@ pub async fn x_processed_event_blobs_stream_from_node( perf_opts: PerfOpts, node: Node, ) -> Result> + Send>>, Error> { - debug!( - "x_processed_event_blobs_stream_from_node to: {}:{}", - node.host, node.port_raw - ); - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let addr = format!("{}:{}", node.host, node.port_raw); + debug!("x_processed_event_blobs_stream_from_node to: {addr}",); + let net = TcpStream::connect(addr.clone()).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - EventQueryJsonStringFrame(qjs), - ))); - let buf = make_frame(&item)?; + let item = sitem_data(EventQueryJsonStringFrame(qjs)); + let buf = item.make_frame()?; netout.write_all(&buf).await?; let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let frames = Box::pin(frames) as _; - let items = EventsFromFrames::new(frames); + let frames = Box::pin(frames); + let items = EventsFromFrames::new(frames, addr); Ok(Box::pin(items)) } @@ -69,22 +63,23 @@ where // TODO when unit tests established, change to async connect: let mut streams = Vec::new(); for node in &cluster.nodes { - debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw); - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let addr = format!("{}:{}", node.host, node.port_raw); + debug!("open_tcp_streams to: {addr}"); + let net = TcpStream::connect(addr.clone()).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); - let item = EventQueryJsonStringFrame(qjs); - let item = sitem_data(item); - let buf = make_frame(&item)?; + let item = sitem_data(EventQueryJsonStringFrame(qjs)); + let buf = item.make_frame()?; netout.write_all(&buf).await?; let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2); - let frames = Box::pin(frames) as _; - let stream = EventsFromFrames::::new(frames); + let perf_opts = PerfOpts::default(); + let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let frames = Box::pin(frames); + let stream = EventsFromFrames::::new(frames, addr); let stream = stream.map(|x| { info!("tcp stream recv sees item {x:?}"); x