diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index ad4d676..6d5a1fe 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -11,7 +11,6 @@ use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; -use netpod::AggKind; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::Channel; @@ -67,9 +66,9 @@ pub async fn get_binned( name: channel_name.into(), series: None, }; - let agg_kind = AggKind::DimXBins1; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count); + let range = NanoRange::from_date_time(beg_date, end_date).into(); + // TODO this was before fixed using AggKind::DimXBins1 + let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); query.set_cache_usage(cache_usage); query.set_disk_stats_every(ByteSize(1024 * disk_stats_every_kb)); let hp = HostPort { host: host, port: port }; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 46b1da4..b5e4947 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -1,14 +1,18 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use crate::test::f32_cmp_near; -use chrono::{DateTime, Utc}; +use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; +use netpod::log::*; use netpod::query::BinnedQuery; +use netpod::AppendToUrl; +use netpod::Channel; +use netpod::Cluster; +use netpod::HostPort; +use netpod::NanoRange; use netpod::APP_JSON; -use netpod::{log::*, AggKind}; -use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange}; use serde_json::Value as JsonValue; use url::Url; @@ -267,10 +271,10 @@ async fn get_binned_json( ) -> Result { let t1 = Utc::now(); let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let range = NanoRange::from_date_time(beg_date, end_date); - let query = BinnedQuery::new(channel, range, bin_count, Some(AggKind::TimeWeightedScalar)); + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date).into(); + let query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index f08a090..c38a068 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -1,14 +1,18 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use crate::test::f32_iter_cmp_near; -use chrono::{DateTime, Utc}; +use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; +use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::AppendToUrl; +use netpod::Channel; +use netpod::Cluster; +use netpod::HostPort; +use netpod::NanoRange; use netpod::APP_JSON; -use netpod::{log::*, AggKind}; -use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange}; use serde_json::Value as JsonValue; use url::Url; @@ -100,10 +104,10 @@ async fn events_plain_json( ) -> Result { let t1 = Utc::now(); let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); + let query = PlainEventsQuery::new(channel, range).for_time_weighted_scalar(); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 13bcc2a..863b450 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -1,6 +1,5 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; -use chrono::DateTime; use chrono::Utc; use disk::streamlog::Streamlog; use err::Error; @@ -14,7 +13,6 @@ use items_0::subfr::SubFrId; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; -use netpod::AggKind; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; @@ -104,10 +102,9 @@ where NTY: fmt::Debug + SubFrId + DeserializeOwned, { let t1 = Utc::now(); - let agg_kind = AggKind::DimXBins1; let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; let channel_backend = "testbackend"; let perf_opts = PerfOpts::default(); let channel = Channel { @@ -115,8 +112,9 @@ where name: channel_name.into(), series: None, }; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind)); + let range = NanoRange::from_date_time(beg_date, end_date).into(); + // TODO before, these tests were all fixed using AggKind::DimXBins1 + let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); query.set_cache_usage(CacheUsage::Ignore); query.set_buf_len_disk_io(1024 * 16); let hp = HostPort::from_node(node0); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 6ed0ef9..dfd3d96 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -12,7 +12,6 @@ use hyper::Body; use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::PlainEventsQuery; -use netpod::AggKind; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; @@ -87,7 +86,7 @@ async fn get_plain_events_binary( series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); + let query = PlainEventsQuery::new(channel, range).for_time_weighted_scalar(); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); @@ -260,7 +259,7 @@ pub async fn get_plain_events_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, Some(AggKind::TimeWeightedScalar), None, None); + let query = PlainEventsQuery::new(channel, range); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 7859d77..47b709b 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -1,12 +1,18 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; -use chrono::{DateTime, Utc}; +use chrono::DateTime; +use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; -use netpod::query::{BinnedQuery, CacheUsage}; -use netpod::{log::*, AppendToUrl}; -use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; +use netpod::log::*; +use netpod::query::BinnedQuery; +use netpod::query::CacheUsage; +use netpod::AppendToUrl; +use netpod::Channel; +use netpod::Cluster; +use netpod::NanoRange; +use netpod::APP_JSON; use std::time::Duration; use url::Url; @@ -20,7 +26,7 @@ fn time_weighted_json_03() -> Result<(), Error> { "1970-01-01T00:20:11.000Z", "1970-01-01T00:30:20.000Z", 10, - AggKind::TimeWeightedScalar, + //AggKind::TimeWeightedScalar, cluster, 11, true, @@ -36,6 +42,7 @@ fn time_weighted_json_03() -> Result<(), Error> { #[test] fn time_weighted_json_10() -> Result<(), Error> { async fn inner() -> Result<(), Error> { + error!("TODO this test asked for DimXBins1"); let rh = require_test_hosts_running()?; let cluster = &rh.cluster; get_json_common( @@ -43,7 +50,7 @@ fn time_weighted_json_10() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:30.000Z", 10, - AggKind::DimXBins1, + //AggKind::DimXBins1, cluster, 13, true, @@ -64,7 +71,7 @@ fn time_weighted_json_20() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:45.000Z", 10, - AggKind::TimeWeightedScalar, + //AggKind::TimeWeightedScalar, cluster, 13, true, @@ -86,7 +93,8 @@ async fn get_json_common( beg_date: &str, end_date: &str, bin_count: u32, - agg_kind: AggKind, + // TODO refactor for Transform + //agg_kind: AggKind, cluster: &Cluster, expect_bin_count: u32, expect_finalised_range: bool, @@ -101,8 +109,8 @@ async fn get_json_common( name: channel_name.into(), series: None, }; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, Some(agg_kind)); + let range = NanoRange::from_date_time(beg_date, end_date).into(); + let mut query = BinnedQuery::new(channel, range, bin_count).for_time_weighted_scalar(); query.set_timeout(Duration::from_millis(40000)); query.set_cache_usage(CacheUsage::Ignore); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index fb8c946..ba90346 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -71,6 +71,8 @@ async fn agg_x_dim_0_inner() { event_chunker_conf, false, true, + // TODO + 32, ); let _ = fut1; // TODO add the binning and expectation and await the result. @@ -126,6 +128,8 @@ async fn agg_x_dim_1_inner() { event_chunker_conf, false, true, + // TODO + 32, ); let _ = fut1; // TODO add the binning and expectation and await the result. diff --git a/disk/src/decode.rs b/disk/src/decode.rs index d40c190..389d6fa 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -7,6 +7,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; +use items_0::WithLen; use items_2::eventfull::EventFull; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; @@ -345,7 +346,6 @@ impl EventsDynStream { } fn handle_event_full(&mut self, item: EventFull) -> Result<(), Error> { - use items::WithLen; if item.len() >= self.emit_threshold { info!("handle_event_full item len {}", item.len()); } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index a5d2e92..e557588 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,7 +1,6 @@ use crate::dataopen::open_expanded_files; use crate::dataopen::open_files; use crate::dataopen::OpenedFileSet; -use crate::merge::MergedStream; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -10,6 +9,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::eventfull::EventFull; +use items_2::merger::Merger; use netpod::log::*; use netpod::timeunits::SEC; use netpod::ChannelConfig; @@ -21,7 +21,7 @@ use std::task::Context; use std::task::Poll; use streams::eventchunker::EventChunker; use streams::eventchunker::EventChunkerConf; -use streams::rangefilter::RangeFilter; +use streams::rangefilter2::RangeFilter2; pub trait InputTraits: Stream> {} @@ -39,6 +39,7 @@ pub struct EventChunkerMultifile { expand: bool, do_decompress: bool, max_ts: u64, + out_max_len: usize, emit_count: usize, do_emit_err_after: Option, range_final: bool, @@ -57,6 +58,7 @@ impl EventChunkerMultifile { event_chunker_conf: EventChunkerConf, expand: bool, do_decompress: bool, + out_max_len: usize, ) -> Self { info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}"); let file_chan = if expand { @@ -76,6 +78,7 @@ impl EventChunkerMultifile { expand, do_decompress, max_ts: 0, + out_max_len, emit_count: 0, do_emit_err_after: None, range_final: false, @@ -91,7 +94,7 @@ impl Stream for EventChunkerMultifile { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { //tracing::field::DebugValue; - let span1 = span!(Level::INFO, "EventChunkerMultifile", node_ix = self.node_ix); + let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix); //span1.record("node_ix", &self.node_ix); let _spg = span1.enter(); use Poll::*; @@ -186,7 +189,7 @@ impl Stream for EventChunkerMultifile { self.expand, self.do_decompress, ); - let filtered = RangeFilter::new(chunker, self.range.clone(), self.expand); + let filtered = RangeFilter2::new(chunker, self.range.clone(), self.expand); self.evs = Some(Box::pin(filtered)); } None => {} @@ -221,11 +224,14 @@ impl Stream for EventChunkerMultifile { self.expand, self.do_decompress, ); - chunkers.push(chunker); + let chunker = chunker + //.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) + ; + chunkers.push(Box::pin(chunker) as _); } } - let merged = MergedStream::new(chunkers); - let filtered = RangeFilter::new(merged, self.range.clone(), self.expand); + let merged = Merger::new(chunkers, self.out_max_len); + let filtered = RangeFilter2::new(merged, self.range.clone(), self.expand); self.evs = Some(Box::pin(filtered)); Ready(Some(Ok(StreamItem::Log(item)))) } @@ -272,7 +278,7 @@ mod test { use netpod::DiskIoTune; use netpod::Nanos; use streams::eventchunker::EventChunkerConf; - use streams::rangefilter::RangeFilter; + use streams::rangefilter2::RangeFilter2; fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { let chn = netpod::Channel { @@ -308,10 +314,12 @@ mod test { event_chunker_conf, true, true, + // TODO do asserts depend on this? + 32, ); //let mut events = MergedStream::new(vec![events], range.clone(), true); - let mut events = RangeFilter::new(events, range.clone(), true); - let mut tss = vec![]; + let mut events = RangeFilter2::new(events, range.clone(), true); + let mut tss = Vec::new(); while let Some(item) = events.next().await { match item { Ok(item) => match item { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index e8e83ca..3c8d376 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,447 +1 @@ pub mod mergedblobsfromremotes; - -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items::Appendable; -use items::ByteEstimate; -use items::PushableIndex; -use items::WithTimestamps; -use items_0::streamitem::LogItem; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StatsItem; -use items_0::streamitem::StreamItem; -use netpod::histo::HistoLog2; -use netpod::log::*; -use netpod::ByteSize; -use std::collections::VecDeque; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -const LOG_EMIT_ITEM: bool = false; - -enum MergedCurVal { - None, - Finish, - Val(T), -} - -pub struct MergedStream { - inps: Vec, - current: Vec>, - ixs: Vec, - errored: bool, - completed: bool, - batch: Option, - ts_last_emit: u64, - range_complete_observed: Vec, - range_complete_observed_all: bool, - range_complete_observed_all_emitted: bool, - data_emit_complete: bool, - batch_size: ByteSize, - batch_len_emit_histo: HistoLog2, - logitems: VecDeque, - stats_items: VecDeque, -} - -impl Drop for MergedStream { - fn drop(&mut self) { - // TODO collect somewhere - debug!( - "MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}", - self.batch_len_emit_histo - ); - } -} - -impl MergedStream -where - S: Stream> + Unpin, - ITY: Appendable + Unpin, -{ - pub fn new(inps: Vec) -> Self { - trace!("MergedStream::new"); - let n = inps.len(); - let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); - Self { - inps, - current: current, - ixs: vec![0; n], - errored: false, - completed: false, - batch: None, - ts_last_emit: 0, - range_complete_observed: vec![false; n], - range_complete_observed_all: false, - range_complete_observed_all_emitted: false, - data_emit_complete: false, - batch_size: ByteSize::kb(128), - batch_len_emit_histo: HistoLog2::new(0), - logitems: VecDeque::new(), - stats_items: VecDeque::new(), - } - } - - fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let mut pending = 0; - for i1 in 0..self.inps.len() { - match self.current[i1] { - MergedCurVal::None => { - 'l1: loop { - break match self.inps[i1].poll_next_unpin(cx) { - Ready(Some(Ok(k))) => match k { - StreamItem::Log(item) => { - self.logitems.push_back(item); - continue 'l1; - } - StreamItem::Stats(item) => { - self.stats_items.push_back(item); - continue 'l1; - } - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed[i1] = true; - let d = self.range_complete_observed.iter().filter(|&&k| k).count(); - if d == self.range_complete_observed.len() { - self.range_complete_observed_all = true; - debug!("MergedStream range_complete d {} COMPLETE", d); - } else { - trace!("MergedStream range_complete d {}", d); - } - continue 'l1; - } - RangeCompletableItem::Data(item) => { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::Val(item); - } - }, - }, - Ready(Some(Err(e))) => { - // TODO emit this error, consider this stream as done, anything more to do here? - //self.current[i1] = CurVal::Err(e); - self.errored = true; - return Ready(Err(e)); - } - Ready(None) => { - self.current[i1] = MergedCurVal::Finish; - } - Pending => { - pending += 1; - } - }; - } - } - _ => (), - } - } - if pending > 0 { - Pending - } else { - Ready(Ok(())) - } - } -} - -impl Stream for MergedStream -where - S: Stream> + Unpin, - ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin, -{ - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let span = netpod::log::span!(Level::INFO, "disk::merge"); - let _spg = span.enter(); - 'outer: loop { - break if self.completed { - panic!("poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.logitems.pop_front() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(item) = self.stats_items.pop_front() { - Ready(Some(Ok(StreamItem::Stats(item)))) - } else if self.range_complete_observed_all_emitted { - self.completed = true; - Ready(None) - } else if self.data_emit_complete { - if self.range_complete_observed_all { - if self.range_complete_observed_all_emitted { - self.completed = true; - Ready(None) - } else { - self.range_complete_observed_all_emitted = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } - } else { - self.completed = true; - Ready(None) - } - } else { - match self.replenish(cx) { - Ready(Ok(_)) => { - let mut lowest_ix = usize::MAX; - let mut lowest_ts = u64::MAX; - for i1 in 0..self.inps.len() { - if let MergedCurVal::Val(val) = &self.current[i1] { - let u = self.ixs[i1]; - if u >= val.len() { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::None; - continue 'outer; - } else { - let ts = val.ts(u); - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } - } - } - } - if lowest_ix == usize::MAX { - if let Some(batch) = self.batch.take() { - if batch.len() != 0 { - self.batch_len_emit_histo.ingest(batch.len() as u32); - self.data_emit_complete = true; - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..batch.len() { - aa.push(batch.ts(ii)); - } - debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) - } else { - self.data_emit_complete = true; - continue 'outer; - } - } else { - self.data_emit_complete = true; - continue 'outer; - } - } else { - // TODO unordered cases - if lowest_ts < self.ts_last_emit { - self.errored = true; - let msg = format!( - "unordered event at lowest_ts {} ts_last_emit {}", - lowest_ts, self.ts_last_emit - ); - return Ready(Some(Err(Error::with_public_msg(msg)))); - } else { - self.ts_last_emit = self.ts_last_emit.max(lowest_ts); - } - { - let batch = self.batch.take(); - let rix = self.ixs[lowest_ix]; - match &self.current[lowest_ix] { - MergedCurVal::Val(val) => { - let mut ldst = batch.unwrap_or_else(|| val.empty_like_self()); - if false { - info!( - "Push event rix {} lowest_ix {} lowest_ts {}", - rix, lowest_ix, lowest_ts - ); - } - ldst.push_index(val, rix); - self.batch = Some(ldst); - } - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - } - } - self.ixs[lowest_ix] += 1; - let curlen = match &self.current[lowest_ix] { - MergedCurVal::Val(val) => val.len(), - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - }; - if self.ixs[lowest_ix] >= curlen { - self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedCurVal::None; - } - let emit_packet_now = if let Some(batch) = &self.batch { - if batch.byte_estimate() >= self.batch_size.bytes() as u64 { - true - } else { - false - } - } else { - false - }; - if emit_packet_now { - if let Some(batch) = self.batch.take() { - trace!("emit item because over threshold len {}", batch.len()); - self.batch_len_emit_histo.ingest(batch.len() as u32); - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..batch.len() { - aa.push(batch.ts(ii)); - } - debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) - } else { - continue 'outer; - } - } else { - continue 'outer; - } - } - } - Ready(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - }; - } - } -} - -#[cfg(test)] -mod test { - use crate::dataopen::position_file_for_test; - use crate::file_content_stream; - use crate::merge::MergedStream; - use err::Error; - use futures_util::StreamExt; - use items_0::streamitem::RangeCompletableItem; - use items_0::streamitem::StreamItem; - use netpod::log::*; - use netpod::test_data_base_path_databuffer; - use netpod::timeunits::DAY; - use netpod::timeunits::MS; - use netpod::ByteOrder; - use netpod::ByteSize; - use netpod::Channel; - use netpod::ChannelConfig; - use netpod::NanoRange; - use netpod::Nanos; - use netpod::ScalarType; - use netpod::Shape; - use std::path::PathBuf; - use streams::eventchunker::EventChunker; - use streams::eventchunker::EventChunkerConf; - - fn scalar_file_path() -> PathBuf { - test_data_base_path_databuffer() - .join("node00/ks_2/byTime/scalar-i32-be") - .join("0000000000000000001/0000000000/0000000000086400000_00000_Data") - } - - #[allow(unused)] - fn wave_file_path() -> PathBuf { - test_data_base_path_databuffer() - .join("node00/ks_3/byTime/wave-f64-be-n21") - .join("0000000000000000001/0000000000/0000000000086400000_00000_Data") - } - - #[derive(Debug)] - struct CollectedEvents { - tss: Vec, - } - - async fn collect_merged_events(paths: Vec, range: NanoRange) -> Result { - let mut files = Vec::new(); - for path in paths { - let p = position_file_for_test(&path, &range, false, false).await?; - if !p.found { - return Err(Error::with_msg_no_trace("can not position file??")); - } - let file = p - .file - .file - .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?; - files.push((path, file)); - } - let inps = files - .into_iter() - .map(|(path, file)| { - let disk_io_tune = netpod::DiskIoTune::default(); - let inp = file_content_stream(path, file, disk_io_tune); - inp - }) - .map(|inp| { - let channel_config = ChannelConfig { - channel: Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - series: None, - }, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: ScalarType::I32, - byte_order: ByteOrder::Big, - array: false, - compression: false, - shape: Shape::Scalar, - }; - let stats_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - let expand = false; - let do_decompress = false; - let dbg_path = PathBuf::from("/dbg/dummy"); - - // TODO `expand` flag usage - // Does Chunker need to know about `expand` and why? - - let chunker = EventChunker::from_event_boundary( - Box::pin(inp), - channel_config, - range.clone(), - stats_conf, - dbg_path.clone(), - expand, - do_decompress, - ); - chunker - }) - .collect(); - let mut merged = MergedStream::new(inps); - let mut cevs = CollectedEvents { tss: vec![] }; - let mut i1 = 0; - // TODO assert more - while let Some(item) = merged.next().await { - if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { - debug!("item: {:?}", item); - for ts in item.tss { - cevs.tss.push(ts); - } - i1 += 1; - } - if i1 >= 10 { - break; - } - } - debug!("read {} data items", i1); - debug!("cevs: {:?}", cevs); - Ok(cevs) - } - - #[test] - fn single_file_through_merger() -> Result<(), Error> { - let fut = async { - let range = NanoRange { - beg: DAY + MS * 1501, - end: DAY + MS * 4000, - }; - let path = scalar_file_path(); - collect_merged_events(vec![path], range).await?; - - // TODO - // assert things - // remove zmtp test from default test suite, move to cli instead - - Ok(()) - }; - taskrun::run(fut) - } -} diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index bd22b64..c4f6f71 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -1,10 +1,10 @@ -use crate::merge::MergedStream; use err::Error; use futures_util::pin_mut; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; +use items_2::merger::Merger; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::Cluster; @@ -29,7 +29,7 @@ pub struct MergedBlobsFromRemotes { impl MergedBlobsFromRemotes { pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { debug!("MergedBlobsFromRemotes evq {:?}", evq); - let mut tcp_establish_futs = vec![]; + let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone()); let f: T002 = Box::pin(f); @@ -97,8 +97,9 @@ impl Stream for MergedBlobsFromRemotes { Pending } else { if c1 == self.tcp_establish_futs.len() { - let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::new(inps); + let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); + // TODO set out_max_len dynamically + let s1 = Merger::new(inps, 128); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index ffdcc08..0b2b8c4 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -104,6 +104,13 @@ pub async fn make_event_pipe( need_expand = evq.one_before_range() ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + // TODO should not need this for correctness. + // Should limit based on return size and latency. + let out_max_len = if node_config.node_config.cluster.is_central_storage { + 1 + } else { + 128 + }; let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, channel_config.clone(), @@ -113,6 +120,7 @@ pub async fn make_event_pipe( event_chunker_conf, evq.one_before_range(), true, + out_max_len, ); let shape = entry.to_shape()?; error!("TODO replace AggKind in the called code"); @@ -171,6 +179,13 @@ pub fn make_local_event_blobs_stream( array: entry.is_array, compression: entry.is_compressed, }; + // TODO should not need this for correctness. + // Should limit based on return size and latency. + let out_max_len = if node_config.node_config.cluster.is_central_storage { + 1 + } else { + 128 + }; let event_blobs = EventChunkerMultifile::new( range, channel_config.clone(), @@ -180,6 +195,7 @@ pub fn make_local_event_blobs_stream( event_chunker_conf, expand, do_decompress, + out_max_len, ); Ok(event_blobs) } @@ -209,6 +225,13 @@ pub fn make_remote_event_blobs_stream( array: entry.is_array, compression: entry.is_compressed, }; + // TODO should not need this for correctness. + // Should limit based on return size and latency. + let out_max_len = if node_config.node_config.cluster.is_central_storage { + 1 + } else { + 128 + }; let event_blobs = EventChunkerMultifile::new( range, channel_config.clone(), @@ -218,6 +241,7 @@ pub fn make_remote_event_blobs_stream( event_chunker_conf, expand, do_decompress, + out_max_len, ); Ok(event_blobs) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 50f1721..0a2caed 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -6,6 +6,8 @@ use crate::BodyStream; use crate::ReqCtx; use bytes::BufMut; use bytes::BytesMut; +use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; +use disk::raw::conn::make_local_event_blobs_stream; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -820,13 +822,7 @@ impl Stream for DataApiPython3DataStream { }; let channel = self.channels[self.chan_ix - 1].clone(); debug!("found channel_config for {}: {:?}", channel.name, entry); - let evq = PlainEventsQuery::new( - channel, - self.range.clone(), - Some(netpod::AggKind::EventBlobs), - Some(Duration::from_millis(600000)), - None, - ); + let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs(); info!("query for event blobs retrieval: evq {evq:?}"); warn!("fix magic inmem_bufcap"); let perf_opts = PerfOpts::default(); @@ -835,8 +831,8 @@ impl Stream for DataApiPython3DataStream { info!("Set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s = disk::raw::conn::make_local_event_blobs_stream( - evq.range().clone(), + let s = make_local_event_blobs_stream( + evq.range().try_into()?, evq.channel().clone(), &entry, evq.one_before_range(), @@ -853,7 +849,7 @@ impl Stream for DataApiPython3DataStream { } } debug!("Set up merged remote stream"); - let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( + let s = MergedBlobsFromRemotes::new( evq, perf_opts, self.node_config.node_config.cluster.clone(), diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 8a6e5dc..1816e4f 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -31,8 +31,8 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache let span1 = span!( Level::INFO, "httpret::binned", - beg = query.range().beg / SEC, - end = query.range().end / SEC, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, ch = query.channel().name(), ); span1.in_scope(|| { diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9bdb36e..91863a6 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -35,7 +35,7 @@ use std::collections::BTreeMap; use url::Url; pub async fn chconf_from_events_v1(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), ncc).await?; + let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; Ok(ret) } @@ -54,7 +54,7 @@ pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) } pub async fn chconf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), ncc).await?; + let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; Ok(ret) } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 90944a1..af848b3 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -32,7 +32,6 @@ use hyper::Response; use net::SocketAddr; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; -use netpod::timeunits::SEC; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::APP_JSON; @@ -464,15 +463,11 @@ async fn prebinned_inner( let (head, _body) = req.into_parts(); let url: url::Url = format!("dummy://{}", head.uri).parse()?; let query = PreBinnedQuery::from_url(&url)?; - let desc = format!( - "pre-W-{}-B-{}", - query.patch().bin_t_len() / SEC, - query.patch().patch_beg() / SEC - ); - let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); + let span1 = span!(Level::INFO, "httpret::prebinned", desc = &query.patch().span_desc()); span1.in_scope(|| { debug!("begin"); }); + error!("TODO hhtpret prebinned_inner"); //let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); todo!() } diff --git a/items/src/lib.rs b/items/src/lib.rs index 58a62f9..4d7b66b 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -1,12 +1,7 @@ -pub mod streams; - use err::Error; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; -#[allow(unused)] -use netpod::log::*; use netpod::NanoRange; -use netpod::Shape; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -16,20 +11,6 @@ use tokio::fs::File; use tokio::io::AsyncRead; use tokio::io::ReadBuf; -pub enum Fits { - Empty, - Lower, - Greater, - Inside, - PartlyLower, - PartlyGreater, - PartlyLowerAndGreater, -} - -pub trait WithLen { - fn len(&self) -> usize; -} - pub trait WithTimestamps { fn ts(&self, ix: usize) -> u64; } @@ -45,38 +26,6 @@ pub trait RangeOverlapInfo { fn starts_after(&self, range: NanoRange) -> bool; } -pub trait FitsInside { - fn fits_inside(&self, range: NanoRange) -> Fits; -} - -pub trait FilterFittingInside: Sized { - fn filter_fitting_inside(self, fit_range: NanoRange) -> Option; -} - -pub trait PushableIndex { - // TODO get rid of usage, involves copy. - // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop? - fn push_index(&mut self, src: &Self, ix: usize); -} - -pub trait NewEmpty { - fn empty(shape: Shape) -> Self; -} - -pub trait Appendable: WithLen { - fn empty_like_self(&self) -> Self; - - // TODO get rid of usage, involves copy. - fn append(&mut self, src: &Self); - - // TODO the `ts2` makes no sense for non-bin-implementors - fn append_zero(&mut self, ts1: u64, ts2: u64); -} - -pub trait Clearable { - fn clear(&mut self); -} - pub trait EventAppendable where Self: Sized, @@ -85,20 +34,15 @@ where fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self; } -pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { - fn ts1s(&self) -> &Vec; - fn ts2s(&self) -> &Vec; -} - // TODO should get I/O and tokio dependence out of this crate -pub trait ReadableFromFile: Sized { +trait ReadableFromFile: Sized { fn read_from_file(file: File) -> Result, Error>; // TODO should not need this: fn from_buf(buf: &[u8]) -> Result; } // TODO should get I/O and tokio dependence out of this crate -pub struct ReadPbv +struct ReadPbv where T: ReadableFromFile, { @@ -112,7 +56,7 @@ impl ReadPbv where T: ReadableFromFile, { - pub fn new(file: File) -> Self { + fn new(file: File) -> Self { Self { // TODO make buffer size a parameter: buf: vec![0; 1024 * 32], diff --git a/items/src/streams.rs b/items/src/streams.rs deleted file mode 100644 index c18d002..0000000 --- a/items/src/streams.rs +++ /dev/null @@ -1,152 +0,0 @@ -use crate::RangeCompletableItem; -use crate::StreamItem; -use crate::WithLen; -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StatsItem; -use netpod::log::*; -use netpod::DiskStats; -use serde::Serialize; -use serde_json::Value as JsonValue; -use std::fmt; -use std::time::Duration; -use tokio::time::timeout_at; - -pub trait Collector: Send + Unpin + WithLen { - type Input: Collectable; - type Output: Serialize; - fn ingest(&mut self, src: &Self::Input); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - fn result(self) -> Result; -} - -pub trait Collectable { - type Collector: Collector; - fn new_collector(bin_count_exp: u32) -> Self::Collector; -} - -pub trait ToJsonBytes { - fn to_json_bytes(&self) -> Result, Error>; -} - -pub trait ToJsonResult { - fn to_json_result(&self) -> Result, Error>; -} - -impl ToJsonBytes for serde_json::Value { - fn to_json_bytes(&self) -> Result, Error> { - Ok(serde_json::to_vec(self)?) - } -} - -impl ToJsonResult for Sitemty { - fn to_json_result(&self) -> Result, Error> { - match self { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())), - RangeCompletableItem::RangeComplete => Err(Error::with_msg("RangeComplete")), - }, - StreamItem::Log(item) => Err(Error::with_msg(format!("Log {:?}", item))), - StreamItem::Stats(item) => Err(Error::with_msg(format!("Stats {:?}", item))), - }, - Err(e) => Err(Error::with_msg(format!("Error {:?}", e))), - } - } -} - -// TODO rename, it is also used for binned: -pub async fn collect_plain_events_json( - stream: S, - timeout: Duration, - bin_count_exp: u32, - events_max: u64, - do_log: bool, -) -> Result -where - S: Stream> + Unpin, - T: Collectable + fmt::Debug, -{ - let deadline = tokio::time::Instant::now() + timeout; - // TODO in general a Collector does not need to know about the expected number of bins. - // It would make more sense for some specific Collector kind to know. - // Therefore introduce finer grained types. - let mut collector = ::new_collector(bin_count_exp); - let mut i1 = 0; - let mut stream = stream; - let mut total_duration = Duration::ZERO; - loop { - let item = if i1 == 0 { - stream.next().await - } else { - if false { - None - } else { - match timeout_at(deadline, stream.next()).await { - Ok(k) => k, - Err(_) => { - collector.set_timed_out(); - None - } - } - } - }; - match item { - Some(item) => { - match item { - Ok(item) => match item { - StreamItem::Log(item) => { - if do_log { - debug!("collect_plain_events_json log {:?}", item); - } - } - StreamItem::Stats(item) => { - match item { - // TODO factor and simplify the stats collection: - StatsItem::EventDataReadStats(_) => {} - StatsItem::RangeFilterStats(_) => {} - StatsItem::DiskStats(item) => match item { - DiskStats::OpenStats(k) => { - total_duration += k.duration; - } - DiskStats::SeekStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadExactStats(k) => { - total_duration += k.duration; - } - }, - } - } - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - collector.set_range_complete(); - } - RangeCompletableItem::Data(item) => { - collector.ingest(&item); - i1 += 1; - if i1 >= events_max { - break; - } - } - }, - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - Err(e)?; - } - }; - } - None => break, - } - } - let ret = serde_json::to_value(collector.result()?)?; - debug!("Total duration: {:?}", total_duration); - Ok(ret) -} diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs deleted file mode 100644 index a0c58f5..0000000 --- a/items_0/src/collect_c.rs +++ /dev/null @@ -1,115 +0,0 @@ -use crate::collect_s::ToJsonBytes; -use crate::collect_s::ToJsonResult; -use crate::AsAnyMut; -use crate::AsAnyRef; -use crate::Events; -use crate::WithLen; -use err::Error; -use netpod::BinnedRangeEnum; -use netpod::SeriesRange; -use std::fmt; - -pub trait Collector: fmt::Debug + Send { - fn len(&self) -> usize; - fn ingest(&mut self, item: &mut dyn Collectable); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error>; -} - -pub trait Collectable: fmt::Debug + AsAnyMut + crate::WithLen { - fn new_collector(&self) -> Box; -} - -pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} - -erased_serde::serialize_trait_object!(Collected); - -impl ToJsonResult for Box { - fn to_json_result(&self) -> Result, Error> { - self.as_ref().to_json_result() - } -} - -impl Collected for Box {} - -#[derive(Debug)] -pub struct CollectorDynDefault {} - -// TODO remove? -pub trait CollectorDyn: fmt::Debug + Send { - fn len(&self) -> usize; - - fn ingest(&mut self, item: &mut dyn CollectableWithDefault); - - fn set_range_complete(&mut self); - - fn set_timed_out(&mut self); - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error>; -} - -pub trait CollectableWithDefault: AsAnyMut { - fn new_collector(&self) -> Box; -} - -impl crate::WithLen for Box { - fn len(&self) -> usize { - self.as_ref().len() - } -} - -impl Collectable for Box { - fn new_collector(&self) -> Box { - todo!() - } -} - -#[derive(Debug)] -pub struct TimeBinnedCollector {} - -impl Collector for TimeBinnedCollector { - fn len(&self) -> usize { - todo!() - } - - fn ingest(&mut self, _item: &mut dyn Collectable) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn set_timed_out(&mut self) { - todo!() - } - - fn result( - &mut self, - _range: Option, - _binrange: Option, - ) -> Result, Error> { - todo!() - } -} - -impl WithLen for Box { - fn len(&self) -> usize { - self.as_ref().len() - } -} - -impl Collectable for Box { - fn new_collector(&self) -> Box { - self.as_ref().new_collector() - } -} diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index fc18e49..43cdec4 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -1,82 +1,15 @@ -use super::collect_c::Collected; use crate::AsAnyMut; use crate::AsAnyRef; +use crate::Events; use crate::WithLen; use err::Error; +use netpod::log::*; use netpod::BinnedRangeEnum; use netpod::SeriesRange; use serde::Serialize; use std::any::Any; use std::fmt; -// TODO rename to `Typed` -pub trait CollectorType: Send + Unpin + WithLen { - type Input: Collectable; - type Output: Collected + ToJsonResult + Serialize; - - fn ingest(&mut self, src: &mut Self::Input); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - - // TODO use this crate's Error instead: - fn result(&mut self, range: Option, binrange: Option) -> Result; -} - -pub trait Collector: Send + Unpin + WithLen { - fn ingest(&mut self, src: &mut dyn Collectable); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - // TODO factor the required parameters into new struct? Generic over events or binned? - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error>; -} - -// TODO rename to `Typed` -pub trait CollectableType: AsAnyRef + AsAnyMut { - type Collector: CollectorType; - fn new_collector() -> Self::Collector; -} - -pub trait Collectable: AsAnyRef + AsAnyMut + Any { - fn new_collector(&self) -> Box; -} - -impl Collector for T { - fn ingest(&mut self, src: &mut dyn Collectable) { - let src: &mut ::Input = src.as_any_mut().downcast_mut().expect("can not downcast"); - T::ingest(self, src) - } - - fn set_range_complete(&mut self) { - T::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - T::set_timed_out(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error> { - let ret = T::result(self, range, binrange)?; - Ok(Box::new(ret) as _) - } -} - -impl Collectable for T -where - T: CollectableType + 'static, -{ - fn new_collector(&self) -> Box { - Box::new(T::new_collector()) as _ - } -} - // TODO check usage of this trait pub trait ToJsonBytes { fn to_json_bytes(&self) -> Result, Error>; @@ -113,9 +46,114 @@ impl ToJsonBytes for serde_json::Value { } } +pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} + +erased_serde::serialize_trait_object!(Collected); + +impl ToJsonResult for Box { + fn to_json_result(&self) -> Result, Error> { + self.as_ref().to_json_result() + } +} + +impl Collected for Box {} + +// TODO rename to `Typed` +pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen { + type Input: Collectable; + type Output: Collected + ToJsonResult + Serialize; + + fn ingest(&mut self, src: &mut Self::Input); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + + // TODO use this crate's Error instead: + fn result(&mut self, range: Option, binrange: Option) -> Result; +} + +pub trait Collector: fmt::Debug + Send + Unpin + WithLen { + fn ingest(&mut self, src: &mut dyn Collectable); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + // TODO factor the required parameters into new struct? Generic over events or binned? + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error>; +} + +impl Collector for T +where + T: fmt::Debug + CollectorType + 'static, +{ + fn ingest(&mut self, src: &mut dyn Collectable) { + let x = src.as_any_mut().downcast_mut(); + if x.is_none() { + warn!("TODO handle the case of incoming Box"); + } + let src: &mut ::Input = x.expect("can not downcast"); + T::ingest(self, src) + } + + fn set_range_complete(&mut self) { + T::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + T::set_timed_out(self) + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + let ret = T::result(self, range, binrange)?; + Ok(Box::new(ret)) + } +} + +// TODO rename to `Typed` +pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut { + type Collector: CollectorType; + fn new_collector() -> Self::Collector; +} + +pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut { + fn new_collector(&self) -> Box; +} + +impl Collectable for Box { + fn new_collector(&self) -> Box { + self.as_ref().new_collector() + } +} + +impl Collectable for T +where + T: CollectableType + 'static, +{ + fn new_collector(&self) -> Box { + Box::new(T::new_collector()) + } +} + // TODO do this with some blanket impl: impl Collectable for Box { fn new_collector(&self) -> Box { Collectable::new_collector(self.as_ref()) } } + +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + +impl Collectable for Box { + fn new_collector(&self) -> Box { + self.as_ref().new_collector() + } +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 82dc59d..7400e21 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -1,4 +1,3 @@ -pub mod collect_c; pub mod collect_s; pub mod framable; pub mod isodate; @@ -11,12 +10,9 @@ pub mod bincode { pub use bincode::*; } -use collect_c::CollectableWithDefault; use collect_s::Collectable; use collect_s::ToJsonResult; use netpod::BinnedRangeEnum; -use netpod::Dim0Kind; -use netpod::NanoRange; use netpod::ScalarType; use netpod::SeriesRange; use netpod::Shape; @@ -102,7 +98,7 @@ where } /// Data in time-binned form. -pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable + erased_serde::Serialize { +pub trait TimeBinned: Any + TimeBinnable + Collectable + erased_serde::Serialize { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; fn as_collectable_mut(&mut self) -> &mut dyn Collectable; fn edges_slice(&self) -> (&[u64], &[u64]); @@ -159,23 +155,14 @@ impl From for err::Error { /// Container of some form of events, for use as trait object. pub trait Events: - fmt::Debug - + TypeName - + Any - + Collectable - + CollectableWithDefault - + TimeBinnable - + WithLen - + Send - + erased_serde::Serialize - + EventsNonObj + fmt::Debug + TypeName + Any + Collectable + TimeBinnable + WithLen + Send + erased_serde::Serialize + EventsNonObj { fn as_time_binnable(&self) -> &dyn TimeBinnable; fn verify(&self) -> bool; fn output_info(&self); fn as_collectable_mut(&mut self) -> &mut dyn Collectable; - fn as_collectable_with_default_ref(&self) -> &dyn CollectableWithDefault; - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableWithDefault; + fn as_collectable_with_default_ref(&self) -> &dyn Collectable; + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable; fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; // TODO is this used? @@ -193,6 +180,12 @@ pub trait Events: fn pulses(&self) -> &VecDeque; } +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + pub trait EventsNonObj { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque); } diff --git a/items_2/src/binnedcollected.rs b/items_2/src/binnedcollected.rs index 7a6d873..1bc0652 100644 --- a/items_2/src/binnedcollected.rs +++ b/items_2/src/binnedcollected.rs @@ -5,6 +5,7 @@ use crate::Error; use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; +use items_0::collect_s::Collected; use items_0::collect_s::Collector; use items_0::collect_s::ToJsonResult; use items_0::streamitem::RangeCompletableItem; @@ -62,7 +63,7 @@ fn flush_binned( pub struct BinnedCollectedResult { pub range_final: bool, pub did_timeout: bool, - pub result: Box, + pub result: Box, } fn _old_binned_collected( diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index db2436d..3d65ce0 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,14 +1,11 @@ -use crate::ts_offs_from_abs; -use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; -use chrono::TimeZone; -use chrono::Utc; use err::Error; use items_0::collect_s::Collectable; use items_0::collect_s::CollectableType; +use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; @@ -23,7 +20,6 @@ use items_0::TimeBins; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::Dim0Kind; use netpod::SeriesRange; @@ -33,7 +29,6 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; -use std::mem; #[allow(unused)] macro_rules! trace4 { @@ -338,7 +333,7 @@ where } } -impl items_0::collect_c::Collected for BinsDim0CollectedResult {} +impl Collected for BinsDim0CollectedResult {} impl BinsDim0CollectedResult { pub fn len(&self) -> usize { @@ -513,77 +508,6 @@ impl CollectableType for BinsDim0 { } } -impl items_0::collect_c::Collector for BinsDim0Collector -where - NTY: ScalarOps, -{ - fn len(&self) -> usize { - self.vals.as_ref().map_or(0, |x| x.len()) - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { - trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item); - { - { - //let tyid = item.type_id(); - //let tyid = item.as_any_mut().type_id(); - //let tyid = format!("{:?}", item.type_id().to_owned()); - trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id()); - } - trace4!("ty 1: {:40?}", std::any::TypeId::of::>()); - trace4!("ty 2: {:40?}", std::any::TypeId::of::>()); - trace4!("ty 3: {:40?}", std::any::TypeId::of::>>()); - trace4!( - "ty 4: {:?}", - std::any::TypeId::of::>() - ); - trace4!( - "ty 5: {:?}", - std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>() - ); - trace4!("ty 6: {:?}", std::any::TypeId::of::>()); - } - if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest plain"); - CollectorType::ingest(self, item) - } else if let Some(item) = item.as_any_mut().downcast_mut::>>() { - trace4!("ingest boxed"); - CollectorType::ingest(self, item) - } else if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest boxed dyn TimeBinned"); - if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest boxed dyn TimeBinned match"); - CollectorType::ingest(self, item) - } else { - warn!("BinsDim0Collector::ingest unexpected inner item"); - trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item); - } - } else { - warn!("BinsDim0Collector::ingest unexpected item"); - trace!("BinsDim0Collector::ingest unexpected item {:?}", item); - } - } - - fn set_range_complete(&mut self) { - CollectorType::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - CollectorType::set_timed_out(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error> { - match CollectorType::result(self, range, binrange) { - Ok(res) => Ok(Box::new(res)), - Err(e) => Err(e.into()), - } - } -} - pub struct BinsDim0Aggregator { range: SeriesRange, count: u64, @@ -909,12 +833,3 @@ impl TimeBinned for BinsDim0 { self } } - -impl items_0::collect_c::Collectable for BinsDim0 -where - NTY: ScalarOps, -{ - fn new_collector(&self) -> Box { - Box::new(BinsDim0Collector::::new()) - } -} diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index e9d1730..a36de9a 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -8,6 +8,7 @@ use chrono::{TimeZone, Utc}; use err::Error; use items_0::collect_s::Collectable; use items_0::collect_s::CollectableType; +use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; @@ -22,7 +23,6 @@ use items_0::TimeBins; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::Dim0Kind; use netpod::NanoRange; @@ -299,7 +299,7 @@ where } } -impl items_0::collect_c::Collected for BinsXbinDim0CollectedResult {} +impl Collected for BinsXbinDim0CollectedResult {} impl BinsXbinDim0CollectedResult { pub fn len(&self) -> usize { @@ -468,78 +468,6 @@ impl CollectableType for BinsXbinDim0 { } } -impl items_0::collect_c::Collector for BinsXbinDim0Collector -where - NTY: ScalarOps, -{ - fn len(&self) -> usize { - self.vals.len() - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { - /*trace4!("\n\n••••••••••••••••••••••••••••\nINGEST\n{:?}\n\n", item); - { - { - //let tyid = item.type_id(); - //let tyid = item.as_any_mut().type_id(); - //let tyid = format!("{:?}", item.type_id().to_owned()); - trace4!("ty 0: {:40?}", (item.as_any_mut() as &dyn Any).type_id()); - } - trace4!("ty 1: {:40?}", std::any::TypeId::of::>()); - trace4!("ty 2: {:40?}", std::any::TypeId::of::>()); - trace4!("ty 3: {:40?}", std::any::TypeId::of::>>()); - trace4!( - "ty 4: {:?}", - std::any::TypeId::of::>() - ); - trace4!( - "ty 5: {:?}", - std::any::TypeId::of::<&mut dyn items_0::collect_c::Collectable>() - ); - trace4!("ty 6: {:?}", std::any::TypeId::of::>()); - } - if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest plain"); - CollectorType::ingest(self, item) - } else if let Some(item) = item.as_any_mut().downcast_mut::>>() { - trace4!("ingest boxed"); - CollectorType::ingest(self, item) - } else if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest boxed dyn TimeBinned"); - if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace4!("ingest boxed dyn TimeBinned match"); - CollectorType::ingest(self, item) - } else { - warn!("BinsDim0Collector::ingest unexpected inner item"); - trace!("BinsDim0Collector::ingest unexpected inner item {:?}", item); - } - } else { - warn!("BinsDim0Collector::ingest unexpected item"); - trace!("BinsDim0Collector::ingest unexpected item {:?}", item); - }*/ - todo!() - } - - fn set_range_complete(&mut self) { - CollectorType::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - CollectorType::set_timed_out(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error> { - match CollectorType::result(self, range, binrange) { - Ok(res) => Ok(Box::new(res)), - Err(e) => Err(e.into()), - } - } -} - pub struct BinsXbinDim0Aggregator { range: SeriesRange, count: u64, @@ -798,7 +726,7 @@ impl TimeBinner for BinsXbinDim0TimeBinner { fn set_range_complete(&mut self) {} - fn empty(&self) -> Box { + fn empty(&self) -> Box { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } @@ -861,12 +789,3 @@ impl TimeBinned for BinsXbinDim0 { self } } - -impl items_0::collect_c::Collectable for BinsXbinDim0 -where - NTY: ScalarOps, -{ - fn new_collector(&self) -> Box { - Box::new(BinsXbinDim0Collector::::new()) - } -} diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 402c854..b4575c2 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,12 +1,16 @@ use crate::framable::FrameType; use crate::merger; +use crate::merger::Mergeable; use crate::Events; use items_0::collect_s::Collectable; +use items_0::collect_s::Collected; use items_0::collect_s::Collector; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::AsAnyMut; use items_0::AsAnyRef; +use items_0::MergeError; +use items_0::WithLen; use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -378,6 +382,7 @@ mod test_channel_events_serde { use bincode::config::WithOtherTrailing; use bincode::DefaultOptions; use items_0::bincode; + use items_0::Appendable; use items_0::Empty; use serde::Deserialize; use serde::Serialize; @@ -473,17 +478,19 @@ impl PartialEq for ChannelEvents { } } -impl crate::merger::Mergeable for ChannelEvents { +impl WithLen for ChannelEvents { fn len(&self) -> usize { match self { - ChannelEvents::Events(k) => k.len(), + ChannelEvents::Events(k) => k.as_ref().len(), ChannelEvents::Status(k) => match k { Some(_) => 1, None => 0, }, } } +} +impl Mergeable for ChannelEvents { fn ts_min(&self) -> Option { match self { ChannelEvents::Events(k) => k.ts_min(), @@ -511,18 +518,18 @@ impl crate::merger::Mergeable for ChannelEvents { } } - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> { + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { match self { ChannelEvents::Events(k) => match dst { ChannelEvents::Events(j) => k.drain_into(j, range), - ChannelEvents::Status(_) => Err(merger::MergeError::NotCompatible), + ChannelEvents::Status(_) => Err(MergeError::NotCompatible), }, ChannelEvents::Status(k) => match dst { - ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), + ChannelEvents::Events(_) => Err(MergeError::NotCompatible), ChannelEvents::Status(j) => match j { Some(_) => { trace!("drain_into merger::MergeError::Full"); - Err(merger::MergeError::Full) + Err(MergeError::Full) } None => { if range.0 > 0 { @@ -596,10 +603,7 @@ impl crate::merger::Mergeable for ChannelEvents { impl Collectable for ChannelEvents { fn new_collector(&self) -> Box { - match self { - ChannelEvents::Events(_item) => todo!(), - ChannelEvents::Status(_) => todo!(), - } + Box::new(ChannelEventsCollector::new()) } } @@ -727,11 +731,11 @@ impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput { } } -impl items_0::collect_c::Collected for ChannelEventsCollectorOutput {} +impl Collected for ChannelEventsCollectorOutput {} #[derive(Debug)] pub struct ChannelEventsCollector { - coll: Option>, + coll: Option>, range_complete: bool, timed_out: bool, } @@ -746,15 +750,14 @@ impl ChannelEventsCollector { } } -impl items_0::collect_c::Collector for ChannelEventsCollector { +impl WithLen for ChannelEventsCollector { fn len(&self) -> usize { - match &self.coll { - Some(coll) => coll.len(), - None => 0, - } + self.coll.as_ref().map_or(0, |x| x.len()) } +} - fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { +impl Collector for ChannelEventsCollector { + fn ingest(&mut self, item: &mut dyn Collectable) { if let Some(item) = item.as_any_mut().downcast_mut::() { match item { ChannelEvents::Events(item) => { @@ -786,7 +789,7 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { &mut self, range: Option, binrange: Option, - ) -> Result, err::Error> { + ) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { if self.range_complete { @@ -805,17 +808,3 @@ impl items_0::collect_c::Collector for ChannelEventsCollector { } } } - -impl items_0::WithLen for ChannelEvents { - fn len(&self) -> usize { - match self { - ChannelEvents::Events(k) => k.len(), - ChannelEvents::Status(_) => 1, - } - } -} -impl items_0::collect_c::Collectable for ChannelEvents { - fn new_collector(&self) -> Box { - Box::new(ChannelEventsCollector::new()) - } -} diff --git a/items_2/src/eventfull.rs b/items_2/src/eventfull.rs index 7685343..5f02042 100644 --- a/items_2/src/eventfull.rs +++ b/items_2/src/eventfull.rs @@ -1,11 +1,12 @@ use crate::framable::FrameType; +use crate::merger::Mergeable; use bytes::BytesMut; use items::ByteEstimate; -use items::Clearable; -use items::PushableIndex; use items::WithTimestamps; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; +use items_0::Empty; +use items_0::MergeError; use items_0::WithLen; use netpod::ScalarType; use netpod::Shape; @@ -69,19 +70,6 @@ mod decomps_serde { } impl EventFull { - pub fn empty() -> Self { - Self { - tss: VecDeque::new(), - pulses: VecDeque::new(), - blobs: VecDeque::new(), - decomps: VecDeque::new(), - scalar_types: VecDeque::new(), - be: VecDeque::new(), - shapes: VecDeque::new(), - comps: VecDeque::new(), - } - } - pub fn add_event( &mut self, ts: u64, @@ -132,54 +120,27 @@ impl FrameType for EventFull { } } +impl Empty for EventFull { + fn empty() -> Self { + Self { + tss: VecDeque::new(), + pulses: VecDeque::new(), + blobs: VecDeque::new(), + decomps: VecDeque::new(), + scalar_types: VecDeque::new(), + be: VecDeque::new(), + shapes: VecDeque::new(), + comps: VecDeque::new(), + } + } +} + impl WithLen for EventFull { fn len(&self) -> usize { self.tss.len() } } -impl items::WithLen for EventFull { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl items::Appendable for EventFull { - fn empty_like_self(&self) -> Self { - Self::empty() - } - - // TODO expensive, get rid of it. - fn append(&mut self, src: &Self) { - self.tss.extend(&src.tss); - self.pulses.extend(&src.pulses); - self.blobs.extend(src.blobs.iter().map(Clone::clone)); - self.decomps.extend(src.decomps.iter().map(Clone::clone)); - self.scalar_types.extend(src.scalar_types.iter().map(Clone::clone)); - self.be.extend(&src.be); - self.shapes.extend(src.shapes.iter().map(Clone::clone)); - self.comps.extend(src.comps.iter().map(Clone::clone)); - } - - fn append_zero(&mut self, _ts1: u64, _ts2: u64) { - // TODO do we still need this type? - todo!() - } -} - -impl Clearable for EventFull { - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.blobs.clear(); - self.decomps.clear(); - self.scalar_types.clear(); - self.be.clear(); - self.shapes.clear(); - self.comps.clear(); - } -} - impl WithTimestamps for EventFull { fn ts(&self, ix: usize) -> u64 { self.tss[ix] @@ -199,16 +160,57 @@ impl ByteEstimate for EventFull { } } -impl PushableIndex for EventFull { - // TODO check all use cases, can't we move? - fn push_index(&mut self, src: &Self, ix: usize) { - self.tss.push_back(src.tss[ix]); - self.pulses.push_back(src.pulses[ix]); - self.blobs.push_back(src.blobs[ix].clone()); - self.decomps.push_back(src.decomps[ix].clone()); - self.scalar_types.push_back(src.scalar_types[ix].clone()); - self.be.push_back(src.be[ix]); - self.shapes.push_back(src.shapes[ix].clone()); - self.comps.push_back(src.comps[ix].clone()); +impl Mergeable for EventFull { + fn ts_min(&self) -> Option { + self.tss.front().map(|&x| x) + } + + fn ts_max(&self) -> Option { + self.tss.back().map(|&x| x) + } + + fn new_empty(&self) -> Self { + Empty::empty() + } + + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { + // TODO make it harder to forget new members when the struct may get modified in the future + let r = range.0..range.1; + dst.tss.extend(self.tss.drain(r.clone())); + dst.pulses.extend(self.pulses.drain(r.clone())); + dst.blobs.extend(self.blobs.drain(r.clone())); + dst.decomps.extend(self.decomps.drain(r.clone())); + dst.scalar_types.extend(self.scalar_types.drain(r.clone())); + dst.be.extend(self.be.drain(r.clone())); + dst.shapes.extend(self.shapes.drain(r.clone())); + dst.comps.extend(self.comps.drain(r.clone())); + Ok(()) + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m > ts { + return Some(i); + } + } + None + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate() { + if m >= ts { + return Some(i); + } + } + None + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + for (i, &m) in self.tss.iter().enumerate().rev() { + if m < ts { + return Some(i); + } + } + None } } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 92f9c32..e8f9325 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -4,6 +4,12 @@ use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; +use items_0::collect_s::Collectable; +use items_0::collect_s::Collected; +use items_0::collect_s::Collector; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonBytes; +use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::AsAnyMut; @@ -11,13 +17,14 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; +use items_0::MergeError; use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; +use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; -use netpod::Dim0Kind; use netpod::NanoRange; use netpod::SeriesRange; use serde::Deserialize; @@ -42,14 +49,6 @@ pub struct EventsDim0 { } impl EventsDim0 { - #[inline(always)] - pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) { - self.tss.push_back(ts); - self.pulses.push_back(pulse); - self.values.push_back(value); - } - - #[inline(always)] pub fn push_front(&mut self, ts: u64, pulse: u64, value: NTY) { self.tss.push_front(ts); self.pulses.push_front(pulse); @@ -327,16 +326,16 @@ where } } -impl items_0::collect_s::ToJsonResult for EventsDim0CollectorOutput { - fn to_json_result(&self) -> Result, Error> { +impl ToJsonResult for EventsDim0CollectorOutput { + fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } } -impl items_0::collect_c::Collected for EventsDim0CollectorOutput {} +impl Collected for EventsDim0CollectorOutput {} -impl items_0::collect_s::CollectorType for EventsDim0Collector { +impl CollectorType for EventsDim0Collector { type Input = EventsDim0; type Output = EventsDim0CollectorOutput; @@ -376,11 +375,11 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector Some(IsoDateTime::from_u64(x.beg + netpod::timeunits::SEC)), + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), SeriesRange::PulseRange(x) => { error!("TODO emit create continueAt for pulse range"); None @@ -434,39 +433,6 @@ impl items_0::collect_s::CollectableType for EventsDim0 { } } -impl items_0::collect_c::Collector for EventsDim0Collector { - fn len(&self) -> usize { - self.vals.as_ref().map_or(0, |x| x.len()) - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { - if let Some(item) = item.as_any_mut().downcast_mut::>() { - items_0::collect_s::CollectorType::ingest(self, item) - } else { - error!("EventsDim0Collector::ingest unexpected item {:?}", item); - } - } - - fn set_range_complete(&mut self) { - items_0::collect_s::CollectorType::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - items_0::collect_s::CollectorType::set_timed_out(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, err::Error> { - match items_0::collect_s::CollectorType::result(self, range, binrange) { - Ok(x) => Ok(Box::new(x)), - Err(e) => Err(e.into()), - } - } -} - pub struct EventsDim0Aggregator { range: SeriesRange, count: u64, @@ -831,15 +797,15 @@ impl Events for EventsDim0 { } } - fn as_collectable_mut(&mut self) -> &mut dyn items_0::collect_s::Collectable { + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { self } - fn as_collectable_with_default_ref(&self) -> &dyn items_0::collect_c::CollectableWithDefault { + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { self } - fn as_collectable_with_default_mut(&mut self) -> &mut dyn items_0::collect_c::CollectableWithDefault { + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { self } @@ -857,7 +823,7 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), items_0::MergeError> { + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -868,7 +834,7 @@ impl Events for EventsDim0 { Ok(()) } else { error!("downcast to EventsDim0 FAILED"); - Err(items_0::MergeError::NotCompatible) + Err(MergeError::NotCompatible) } } @@ -1132,95 +1098,14 @@ impl TimeBinner for EventsDim0TimeBinner { } } -// TODO remove this struct? -#[derive(Debug)] -pub struct EventsDim0CollectorDyn {} - -impl EventsDim0CollectorDyn { - pub fn new() -> Self { - Self {} - } -} - -impl items_0::collect_c::CollectorDyn for EventsDim0CollectorDyn { - fn len(&self) -> usize { - todo!() - } - - fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn set_timed_out(&mut self) { - todo!() - } - - fn result( - &mut self, - _range: Option, - _binrange: Option, - ) -> Result, err::Error> { - todo!() - } -} - -impl items_0::collect_c::CollectorDyn for EventsDim0Collector { - fn len(&self) -> usize { - WithLen::len(self) - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::CollectableWithDefault) { - let x = item.as_any_mut(); - if let Some(item) = x.downcast_mut::>() { - items_0::collect_s::CollectorType::ingest(self, item) - } else { - // TODO need possibility to return error - () - } - } - - fn set_range_complete(&mut self) { - items_0::collect_s::CollectorType::set_range_complete(self); - } - - fn set_timed_out(&mut self) { - items_0::collect_s::CollectorType::set_timed_out(self); - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, err::Error> { - items_0::collect_s::CollectorType::result(self, range, binrange) - .map(|x| Box::new(x) as _) - .map_err(|e| e.into()) - } -} - -impl items_0::collect_c::CollectableWithDefault for EventsDim0 { - fn new_collector(&self) -> Box { - let coll = EventsDim0Collector::::new(); - Box::new(coll) - } -} - -impl items_0::collect_c::Collectable for EventsDim0 { - fn new_collector(&self) -> Box { - Box::new(EventsDim0Collector::::new()) - } -} - impl Appendable for EventsDim0 where STY: ScalarOps, { fn push(&mut self, ts: u64, pulse: u64, value: STY) { - Self::push(self, ts, pulse, value) + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.values.push_back(value); } } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 2b85507..baf11c5 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -4,6 +4,12 @@ use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; +use items_0::collect_s::Collectable; +use items_0::collect_s::CollectableType; +use items_0::collect_s::Collected; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonBytes; +use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::AsAnyMut; @@ -11,20 +17,20 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; +use items_0::MergeError; use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; +use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; -use std::mem; #[allow(unused)] macro_rules! trace2 { @@ -257,16 +263,16 @@ where } } -impl items_0::collect_s::ToJsonResult for EventsDim1CollectorOutput { - fn to_json_result(&self) -> Result, Error> { +impl ToJsonResult for EventsDim1CollectorOutput { + fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } } -impl items_0::collect_c::Collected for EventsDim1CollectorOutput {} +impl Collected for EventsDim1CollectorOutput {} -impl items_0::collect_s::CollectorType for EventsDim1Collector { +impl CollectorType for EventsDim1Collector { type Input = EventsDim1; type Output = EventsDim1CollectorOutput; @@ -328,7 +334,7 @@ impl items_0::collect_s::CollectorType for EventsDim1Collector items_0::collect_s::CollectableType for EventsDim1 { +impl CollectableType for EventsDim1 { type Collector = EventsDim1Collector; fn new_collector() -> Self::Collector { @@ -336,39 +342,6 @@ impl items_0::collect_s::CollectableType for EventsDim1 { } } -impl items_0::collect_c::Collector for EventsDim1Collector { - fn len(&self) -> usize { - self.vals.len() - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) { - if let Some(item) = item.as_any_mut().downcast_mut::>() { - items_0::collect_s::CollectorType::ingest(self, item) - } else { - error!("EventsDim0Collector::ingest unexpected item {:?}", item); - } - } - - fn set_range_complete(&mut self) { - items_0::collect_s::CollectorType::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - items_0::collect_s::CollectorType::set_timed_out(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, err::Error> { - match items_0::collect_s::CollectorType::result(self, range, binrange) { - Ok(x) => Ok(Box::new(x)), - Err(e) => Err(e.into()), - } - } -} - pub struct EventsDim1Aggregator { range: SeriesRange, count: u64, @@ -626,11 +599,11 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { fn ingest(&mut self, item: &Self::Input) { if true { - trace!("{} ingest {} events", std::any::type_name::(), item.len()); + trace!("{} ingest {} events", any::type_name::(), item.len()); } if false { for (i, &ts) in item.tss.iter().enumerate() { - trace!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); + trace!("{} ingest {:6} {:20}", any::type_name::(), i, ts); } } if self.do_time_weight { @@ -657,7 +630,7 @@ impl TimeBinnable for EventsDim1 { Box::new(ret) } - fn to_box_to_json_result(&self) -> Box { + fn to_box_to_json_result(&self) -> Box { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } @@ -722,15 +695,15 @@ impl Events for EventsDim1 { } } - fn as_collectable_mut(&mut self) -> &mut dyn items_0::collect_s::Collectable { + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { self } - fn as_collectable_with_default_ref(&self) -> &dyn items_0::collect_c::CollectableWithDefault { + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { self } - fn as_collectable_with_default_mut(&mut self) -> &mut dyn items_0::collect_c::CollectableWithDefault { + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { self } @@ -748,7 +721,7 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), items_0::MergeError> { + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -759,7 +732,7 @@ impl Events for EventsDim1 { Ok(()) } else { error!("downcast to EventsDim0 FAILED"); - Err(items_0::MergeError::NotCompatible) + Err(MergeError::NotCompatible) } } @@ -1031,89 +1004,6 @@ impl TimeBinner for EventsDim1TimeBinner { } } -// TODO remove this struct? -#[derive(Debug)] -pub struct EventsDim1CollectorDyn {} - -impl EventsDim1CollectorDyn { - pub fn new() -> Self { - Self {} - } -} - -impl items_0::collect_c::CollectorDyn for EventsDim1CollectorDyn { - fn len(&self) -> usize { - todo!() - } - - fn ingest(&mut self, _item: &mut dyn items_0::collect_c::CollectableWithDefault) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn set_timed_out(&mut self) { - todo!() - } - - fn result( - &mut self, - _range: Option, - _binrange: Option, - ) -> Result, err::Error> { - todo!() - } -} - -impl items_0::collect_c::CollectorDyn for EventsDim1Collector { - fn len(&self) -> usize { - WithLen::len(self) - } - - fn ingest(&mut self, item: &mut dyn items_0::collect_c::CollectableWithDefault) { - let x = item.as_any_mut(); - if let Some(item) = x.downcast_mut::>() { - items_0::collect_s::CollectorType::ingest(self, item) - } else { - // TODO need possibility to return error - () - } - } - - fn set_range_complete(&mut self) { - items_0::collect_s::CollectorType::set_range_complete(self); - } - - fn set_timed_out(&mut self) { - items_0::collect_s::CollectorType::set_timed_out(self); - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, err::Error> { - items_0::collect_s::CollectorType::result(self, range, binrange) - .map(|x| Box::new(x) as _) - .map_err(|e| e.into()) - } -} - -impl items_0::collect_c::CollectableWithDefault for EventsDim1 { - fn new_collector(&self) -> Box { - let coll = EventsDim1Collector::::new(); - Box::new(coll) - } -} - -impl items_0::collect_c::Collectable for EventsDim1 { - fn new_collector(&self) -> Box { - Box::new(EventsDim1Collector::::new()) - } -} - impl Appendable> for EventsDim1 where STY: ScalarOps, diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index b8c0394..8d07da5 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -4,13 +4,17 @@ use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; +use items_0::collect_s::CollectableType; +use items_0::collect_s::Collected; +use items_0::collect_s::CollectorType; +use items_0::collect_s::ToJsonBytes; +use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::WithLen; use netpod::log::*; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::SeriesRange; use serde::Deserialize; @@ -387,17 +391,17 @@ where } } -impl items_0::collect_s::ToJsonResult for EventsXbinDim0CollectorOutput +impl ToJsonResult for EventsXbinDim0CollectorOutput where NTY: ScalarOps, { - fn to_json_result(&self) -> Result, Error> { + fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; Ok(Box::new(k)) } } -impl items_0::collect_c::Collected for EventsXbinDim0CollectorOutput where NTY: ScalarOps {} +impl Collected for EventsXbinDim0CollectorOutput where NTY: ScalarOps {} #[derive(Debug)] pub struct EventsXbinDim0Collector { @@ -422,7 +426,7 @@ impl WithLen for EventsXbinDim0Collector { } } -impl items_0::collect_s::CollectorType for EventsXbinDim0Collector +impl CollectorType for EventsXbinDim0Collector where NTY: ScalarOps, { @@ -490,7 +494,7 @@ where } } -impl items_0::collect_s::CollectableType for EventsXbinDim0 +impl CollectableType for EventsXbinDim0 where NTY: ScalarOps, { @@ -500,41 +504,3 @@ where Self::Collector::new() } } - -impl items_0::collect_c::Collector for EventsXbinDim0Collector -where - NTY: ScalarOps, -{ - fn len(&self) -> usize { - todo!() - } - - fn ingest(&mut self, _item: &mut dyn items_0::collect_c::Collectable) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn set_timed_out(&mut self) { - todo!() - } - - fn result( - &mut self, - _range: Option, - _binrange: Option, - ) -> Result, Error> { - todo!() - } -} - -impl items_0::collect_c::Collectable for EventsXbinDim0 -where - NTY: ScalarOps, -{ - fn new_collector(&self) -> Box { - Box::new(::new_collector()) - } -} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 8c384f0..883bf53 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -22,19 +22,15 @@ use channelevents::ChannelEvents; use chrono::DateTime; use chrono::TimeZone; use chrono::Utc; -use futures_util::FutureExt; use futures_util::Stream; -use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::Empty; use items_0::Events; +use items_0::MergeError; use items_0::RangeOverlapInfo; -use netpod::log::*; +use merger::Mergeable; use netpod::timeunits::*; -use netpod::NanoRange; -use netpod::ScalarType; use netpod::SeriesRange; -use netpod::Shape; use serde::Deserialize; use serde::Serialize; use serde::Serializer; @@ -78,9 +74,6 @@ pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { (pulse_anchor, pulse_off) } -#[allow(unused)] -struct Ts(u64); - #[derive(Debug, PartialEq)] pub enum ErrorKind { General, @@ -169,11 +162,7 @@ pub fn make_iso_ts(tss: &[u64]) -> Vec { .collect() } -impl crate::merger::Mergeable for Box { - fn len(&self) -> usize { - self.as_ref().len() - } - +impl Mergeable for Box { fn ts_min(&self) -> Option { self.as_ref().ts_min() } @@ -186,7 +175,7 @@ impl crate::merger::Mergeable for Box { self.as_ref().new_empty() } - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> { + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { self.as_mut().drain_into(dst, range) } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index c1dfe98..b5b92eb 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -1,10 +1,13 @@ -use crate::Error; +pub use crate::Error; + use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::MergeError; +use items_0::WithLen; use netpod::log::*; use std::collections::VecDeque; use std::fmt; @@ -31,11 +34,7 @@ macro_rules! trace4 { ($($arg:tt)*) => (trace!($($arg)*)); } -// TODO -pub use items_0::MergeError; - -pub trait Mergeable: fmt::Debug + Unpin { - fn len(&self) -> usize; +pub trait Mergeable: fmt::Debug + WithLen + Unpin { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn new_empty(&self) -> Self; diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 20a040e..f6039d5 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +pub mod eventsdim0; + use crate::binnedcollected::BinnedCollected; use crate::binsdim0::BinsDim0CollectedResult; use crate::channelevents::ConnStatus; @@ -21,7 +24,9 @@ use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::Appendable; use items_0::Empty; +use items_0::WithLen; use netpod::log::*; use netpod::timeunits::*; use netpod::AggKind; diff --git a/items_2/src/test/eventsdim0.rs b/items_2/src/test/eventsdim0.rs new file mode 100644 index 0000000..61e6080 --- /dev/null +++ b/items_2/src/test/eventsdim0.rs @@ -0,0 +1,24 @@ +use crate::eventsdim0::EventsDim0; +use items_0::Appendable; +use items_0::Empty; +use items_0::Events; + +#[test] +fn collect_s_00() { + let mut evs = EventsDim0::empty(); + evs.push(123, 4, 1.00f32); + evs.push(124, 5, 1.01); + let mut coll = evs.as_collectable_mut().new_collector(); + coll.ingest(&mut evs); + assert_eq!(coll.len(), 2); +} + +#[test] +fn collect_c_00() { + let mut evs = EventsDim0::empty(); + evs.push(123, 4, 1.00f32); + evs.push(124, 5, 1.01); + let mut coll = evs.as_collectable_with_default_ref().new_collector(); + coll.ingest(&mut evs); + assert_eq!(coll.len(), 2); +} diff --git a/items_2/src/testgen.rs b/items_2/src/testgen.rs index 8f72352..870b9df 100644 --- a/items_2/src/testgen.rs +++ b/items_2/src/testgen.rs @@ -1,5 +1,6 @@ use crate::eventsdim0::EventsDim0; use crate::Events; +use items_0::Appendable; use items_0::Empty; #[allow(unused)] diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 0f0647a..3d8af4c 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -7,20 +7,26 @@ pub mod transform; use crate::log::*; use bytes::Bytes; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; use err::Error; -use futures_util::{Stream, StreamExt}; -use serde::{Deserialize, Serialize}; +use futures_util::Stream; +use futures_util::StreamExt; +use serde::Deserialize; +use serde::Serialize; use serde_json::Value as JsVal; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::fmt; use std::iter::FromIterator; use std::net::SocketAddr; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use std::time::Duration; -use std::{fmt, ops}; use timeunits::*; use url::Url; @@ -757,6 +763,17 @@ impl NanoRange { } } +impl TryFrom<&SeriesRange> for NanoRange { + type Error = Error; + + fn try_from(val: &SeriesRange) -> Result { + match val { + SeriesRange::TimeRange(x) => Ok(x.clone()), + SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PulseRange { pub beg: u64, @@ -813,17 +830,6 @@ impl SeriesRange { } } -impl TryFrom<&SeriesRange> for NanoRange { - type Error = Error; - - fn try_from(val: &SeriesRange) -> Result { - match val { - SeriesRange::TimeRange(x) => Ok(x.clone()), - SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), - } - } -} - impl From for SeriesRange { fn from(k: NanoRange) -> Self { Self::TimeRange(k) @@ -1505,6 +1511,17 @@ impl PreBinnedPatchCoordEnum { pub fn bin_count(&self) -> u64 { todo!() } + + pub fn span_desc(&self) -> String { + match self { + PreBinnedPatchCoordEnum::Time(k) => { + format!("pre-W-{}-B-{}", k.bin_len.0 * k.bin_count / SEC, k.patch_offset / SEC) + } + PreBinnedPatchCoordEnum::Pulse(k) => { + format!("pre-W-{}-B-{}", k.bin_len.0 * k.bin_count / SEC, k.patch_offset / SEC) + } + } + } } impl FromUrl for PreBinnedPatchCoordEnum { diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 3dbd6d5..7940b06 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -298,6 +298,18 @@ impl PlainEventsQuery { pub fn set_do_test_stream_error(&mut self, k: bool) { self.do_test_stream_error = k; } + + pub fn for_event_blobs(self) -> Self { + error!("set transform to event blobs"); + err::todo(); + self + } + + pub fn for_time_weighted_scalar(self) -> Self { + error!("set transform to event blobs"); + err::todo(); + self + } } impl HasBackend for PlainEventsQuery { @@ -510,6 +522,11 @@ impl BinnedQuery { pub fn set_buf_len_disk_io(&mut self, k: usize) { self.buf_len_disk_io = Some(k); } + + pub fn for_time_weighted_scalar(self) -> Self { + err::todo(); + self + } } impl HasBackend for BinnedQuery { diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index acfce6b..d9e2748 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -5,6 +5,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; +use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::framable::EventQueryJsonStringFrame; @@ -117,7 +118,7 @@ async fn make_channel_events_stream( debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); let stream = scyllaconn::events::EventsStreamScylla::new( series, - evq.range().clone(), + evq.range().into(), do_one_before_range, scalar_type, shape, @@ -127,11 +128,11 @@ async fn make_channel_events_stream( ); let stream = stream .map({ - let agg_kind = evq.agg_kind_value(); let mut pulse_last = None; move |item| match item { Ok(item) => { - let x = if let AggKind::PulseIdDiff = agg_kind { + // TODO support pulseid extract + let x = if false { let x = match item { ChannelEvents::Events(item) => { let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); @@ -276,51 +277,55 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let mut stream: Pin> + Send>> = - if let AggKind::EventBlobs = evq.agg_kind_value() { - match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { - Ok(stream) => { - let stream = stream.map(|x| Box::new(x) as _); - Box::pin(stream) - } - Err(e) => { - return Err((e, netout).into()); - } + let mut stream: Pin> + Send>> = if false { + if true { + error!("TODO support event blob transform"); + let e = Error::with_msg(format!("TODO support event blob transform")); + return Err((e, netout).into()); + } + match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + Ok(stream) => { + let stream = stream.map(|x| Box::new(x) as _); + Box::pin(stream) } - } else { - match make_channel_events_stream(evq, node_config).await { - Ok(stream) => { - let stream = stream - .map({ - use items_2::eventtransform::EventTransform; - let mut tf = items_2::eventtransform::IdentityTransform::default(); - move |item| match item { - Ok(item2) => match item2 { - StreamItem::DataItem(item3) => match item3 { - RangeCompletableItem::Data(item4) => match item4 { - ChannelEvents::Events(item5) => { - let a = tf.transform(item5); - Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Events(a), - ))) - } - x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - }, - x => Ok(StreamItem::DataItem(x)), + Err(e) => { + return Err((e, netout).into()); + } + } + } else { + match make_channel_events_stream(evq, node_config).await { + Ok(stream) => { + let stream = stream + .map({ + use items_2::eventtransform::EventTransform; + let mut tf = items_2::eventtransform::IdentityTransform::default(); + move |item| match item { + Ok(item2) => match item2 { + StreamItem::DataItem(item3) => match item3 { + RangeCompletableItem::Data(item4) => match item4 { + ChannelEvents::Events(item5) => { + let a = tf.transform(item5); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + a, + )))) + } + x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), }, - x => Ok(x), + x => Ok(StreamItem::DataItem(x)), }, - _ => item, - } - }) - .map(|x| Box::new(x) as _); - Box::pin(stream) - } - Err(e) => { - return Err((e, netout).into()); - } + x => Ok(x), + }, + _ => item, + } + }) + .map(|x| Box::new(x) as _); + Box::pin(stream) } - }; + Err(e) => { + return Err((e, netout).into()); + } + } + }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = stream.next().await { diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 477a8df..17e830a 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -81,13 +81,7 @@ fn raw_data_00() { beg: SEC, end: SEC * 10, }; - let qu = PlainEventsQuery::new( - channel, - range, - Some(AggKind::Plain), - Some(Duration::from_millis(10000)), - None, - ); + let qu = PlainEventsQuery::new(channel, range); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query))); let frame = make_frame(&item).unwrap(); diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index 239e16c..5802fba 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -7,6 +7,7 @@ pub mod status; use err::Error; use errconv::ErrConv; use netpod::ScyllaConfig; +use netpod::SeriesRange; use scylla::statement::Consistency; use scylla::Session as ScySession; use std::sync::Arc; @@ -17,6 +18,15 @@ pub struct ScyllaSeriesRange { end: u64, } +impl From<&SeriesRange> for ScyllaSeriesRange { + fn from(value: &SeriesRange) -> Self { + match value { + SeriesRange::TimeRange(k) => Self { beg: k.beg, end: k.end }, + SeriesRange::PulseRange(k) => Self { beg: k.beg, end: k.end }, + } + } +} + pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, Error> { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 040b517..21c0efa 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -1,11 +1,14 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items_0::collect_c::Collectable; +use items_0::collect_s::Collectable; +use items_0::collect_s::Collected; +use items_0::collect_s::Collector; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; +use items_0::WithLen; use netpod::log::*; use netpod::BinnedRangeEnum; use netpod::DiskStats; @@ -39,12 +42,12 @@ async fn collect_in_span( events_max: u64, range: Option, binrange: Option, -) -> Result, Error> +) -> Result, Error> where S: Stream> + Unpin, - T: Collectable + items_0::WithLen + fmt::Debug, + T: Collectable + WithLen + fmt::Debug, { - let mut collector: Option> = None; + let mut collector: Option> = None; let mut stream = stream; let deadline = deadline.into(); let mut range_complete = false; @@ -138,12 +141,12 @@ pub async fn collect( events_max: u64, range: Option, binrange: Option, -) -> Result, Error> +) -> Result, Error> where S: Stream> + Unpin, - T: Collectable + items_0::WithLen + fmt::Debug, + T: Collectable + WithLen + fmt::Debug, { - let span = tracing::span!(tracing::Level::TRACE, "collect"); + let span = span!(Level::INFO, "collect"); collect_in_span(stream, deadline, events_max, range, binrange) .instrument(span) .await diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 5f90738..f931692 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -9,6 +9,7 @@ use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; +use items_0::Empty; use items_0::WithLen; use items_2::eventfull::EventFull; use netpod::histo::HistoLog2; diff --git a/streams/src/lib.rs b/streams/src/lib.rs index fb12678..b61166d 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -3,10 +3,8 @@ pub mod dtflags; pub mod eventchunker; pub mod filechunkread; pub mod frames; -pub mod merge; pub mod needminbuffer; pub mod plaineventsjson; -pub mod rangefilter; pub mod rangefilter2; pub mod slidebuf; pub mod tcprawclient; diff --git a/streams/src/merge.rs b/streams/src/merge.rs deleted file mode 100644 index 8b13789..0000000 --- a/streams/src/merge.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/streams/src/rangefilter.rs b/streams/src/rangefilter.rs deleted file mode 100644 index 5ec5ac0..0000000 --- a/streams/src/rangefilter.rs +++ /dev/null @@ -1,254 +0,0 @@ -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items::Appendable; -use items::Clearable; -use items::PushableIndex; -use items::WithTimestamps; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StatsItem; -use items_0::streamitem::StreamItem; -use netpod::log::*; -use netpod::NanoRange; -use netpod::Nanos; -use netpod::RangeFilterStats; -use std::fmt; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -pub struct RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - inp: S, - range: NanoRange, - range_str: String, - expand: bool, - ts_max: u64, - stats: RangeFilterStats, - prerange: Option, - have_pre: bool, - have_range_complete: bool, - emitted_post: bool, - data_done: bool, - raco_done: bool, - done: bool, - complete: bool, - items_with_pre: usize, - items_with_post: usize, - items_with_unordered: usize, -} - -impl RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { - trace!("RangeFilter::new range: {:?} expand: {:?}", range, expand); - Self { - inp, - range_str: format!("{:?}", range), - range, - expand, - ts_max: 0, - stats: RangeFilterStats::new(), - prerange: None, - have_pre: false, - have_range_complete: false, - emitted_post: false, - data_done: false, - raco_done: false, - done: false, - complete: false, - items_with_pre: 0, - items_with_post: 0, - items_with_unordered: 0, - } - } -} - -impl RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { - use Poll::*; - loop { - break if self.complete { - panic!("poll_next on complete"); - } else if self.done { - self.complete = true; - Ready(None) - } else if self.raco_done { - self.done = true; - let k = std::mem::replace(&mut self.stats, RangeFilterStats::new()); - let k = StatsItem::RangeFilterStats(k); - Ready(Some(Ok(StreamItem::Stats(k)))) - } else if self.data_done { - self.raco_done = true; - if self.have_range_complete { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue; - } - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { - let mut contains_pre = false; - let mut contains_post = false; - let mut contains_unordered = false; - let mut ret = item.empty_like_self(); - for i1 in 0..item.len() { - let ts = item.ts(i1); - if ts < self.ts_max { - contains_unordered = true; - if false { - self.done = true; - let msg = format!( - "unordered event i1 {} / {} ts {:?} ts_max {:?}", - i1, - item.len(), - Nanos::from_ns(ts), - Nanos::from_ns(self.ts_max) - ); - error!("{}", msg); - return Ready(Some(Err(Error::with_msg(msg)))); - } - } else { - self.ts_max = ts; - if ts < self.range.beg { - contains_pre = true; - if self.expand { - let mut prerange = if let Some(prerange) = self.prerange.take() { - prerange - } else { - item.empty_like_self() - }; - prerange.clear(); - prerange.push_index(&item, i1); - self.prerange = Some(prerange); - self.have_pre = true; - } - } else if ts >= self.range.end { - contains_post = true; - self.have_range_complete = true; - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - ret.push_index(prerange, 0); - prerange.clear(); - self.have_pre = false; - } - if !self.emitted_post { - self.emitted_post = true; - ret.push_index(&item, i1); - //self.data_done = true; - } - } else { - //self.data_done = true; - } - } else { - if self.expand { - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - ret.push_index(prerange, 0); - prerange.clear(); - self.have_pre = false; - } - } - ret.push_index(&item, i1); - } - } - } - if contains_pre { - self.items_with_pre += 1; - } - if contains_post { - self.items_with_post += 1; - } - if contains_unordered { - self.items_with_unordered += 1; - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { - self.have_range_complete = true; - continue; - } - k => Ready(Some(k)), - }, - Ready(None) => { - self.data_done = true; - if self.have_pre { - let prerange = if let Some(prerange) = &mut self.prerange { - prerange - } else { - panic!() - }; - let mut ret = prerange.empty_like_self(); - ret.push_index(&prerange, 0); - prerange.clear(); - self.have_pre = false; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - continue; - } - } - Pending => Pending, - } - }; - } - } -} - -impl Stream for RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - type Item = Sitemty; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty); - span1.record("range", &self.range_str.as_str()); - span1.in_scope(|| Self::poll_next(self, cx)) - } -} - -impl fmt::Debug for RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("RangeFilter") - .field("items_with_pre", &self.items_with_pre) - .field("items_with_post", &self.items_with_post) - .field("items_with_unordered", &self.items_with_unordered) - .finish() - } -} - -impl Drop for RangeFilter -where - S: Stream> + Unpin, - ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, -{ - fn drop(&mut self) { - debug!("Drop {:?}", self); - } -} diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index ef3e784..288f74a 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -5,7 +5,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; -use items_2::merger::MergeError; +use items_0::MergeError; use items_2::merger::Mergeable; use netpod::log::*; use netpod::NanoRange; @@ -152,7 +152,7 @@ where use Poll::*; loop { break if self.complete { - panic!("poll_next on complete"); + Ready(Some(Err(Error::with_msg_no_trace("poll_next on complete")))) } else if self.done { self.complete = true; Ready(None) diff --git a/streams/src/test.rs b/streams/src/test.rs index 9f1550d..f65a7d1 100644 --- a/streams/src/test.rs +++ b/streams/src/test.rs @@ -8,6 +8,7 @@ use futures_util::stream; use futures_util::Stream; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; +use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0;