From eebf8665ce0b626d19d27b4985993e51e4895a5c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 15 Nov 2022 16:16:16 +0100 Subject: [PATCH] WIP refactor data event pipeline --- disk/src/binned.rs | 4 +- disk/src/channelexec.rs | 108 +-------- httpret/src/events.rs | 58 ++--- httpret/src/evinfo.rs | 2 +- items/Cargo.toml | 3 +- items/src/items.rs | 4 + items/src/streams.rs | 101 ++++++++ items_2/src/binsdim0.rs | 13 +- items_2/src/eventsdim0.rs | 9 +- items_2/src/items_2.rs | 88 ++++++- items_2/src/streams.rs | 13 +- items_2/src/test.rs | 6 +- nodenet/Cargo.toml | 4 +- streams/Cargo.toml | 3 +- streams/src/collect.rs | 114 +++++++++ streams/src/eventchunker.rs | 5 +- streams/src/frames/eventsfromframes.rs | 3 +- streams/src/frames/inmem.rs | 3 +- streams/src/lib.rs | 3 + streams/src/merge.rs | 50 ++++ streams/src/merge/mergedstream.rs | 308 +++++++++++++++++++++++++ streams/src/plaineventsjson.rs | 73 ++++++ streams/src/rangefilter.rs | 3 +- streams/src/tcprawclient.rs | 2 +- 24 files changed, 800 insertions(+), 180 deletions(-) create mode 100644 streams/src/collect.rs create mode 100644 streams/src/merge.rs create mode 100644 streams/src/merge/mergedstream.rs create mode 100644 streams/src/plaineventsjson.rs diff --git a/disk/src/binned.rs b/disk/src/binned.rs index ce17c89..63f18e7 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -7,7 +7,7 @@ pub mod query; use crate::agg::binnedt::TBinnerStream; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binnedstream::BoxedStream; -use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; +use crate::channelexec::{channel_exec, ChannelExecFunction}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::merge::mergedfromremotes::MergedFromRemotes; use bytes::Bytes; @@ -15,7 +15,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::numops::NumOps; -use items::streams::{Collectable, Collector}; +use items::streams::{collect_plain_events_json, Collectable, Collector}; use items::{ Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameDecodable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, WithLen, diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 5c602c7..a86a669 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -1,8 +1,6 @@ use crate::agg::enp::Identity; -use crate::decode::{ - BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, - LittleEndian, NumFromBytes, -}; +use crate::decode::{BigEndian, Endianness, EventValueFromBytes, EventValueShape, LittleEndian, NumFromBytes}; +use crate::decode::{EventValuesDim0Case, EventValuesDim1Case}; use crate::merge::mergedfromremotes::MergedFromRemotes; use bytes::Bytes; use err::Error; @@ -11,20 +9,15 @@ use futures_util::future::FutureExt; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps, StringNum}; use items::scalarevents::ScalarEvents; -use items::streams::{Collectable, Collector}; -use items::{ - Clearable, EventsNodeProcessor, Framable, FrameType, FrameTypeStatic, PushableIndex, RangeCompletableItem, Sitemty, - StreamItem, TimeBinnableType, -}; -use netpod::log::*; +use items::streams::{collect_plain_events_json, Collectable}; +use items::{Clearable, EventsNodeProcessor, Framable, FrameType, FrameTypeStatic}; +use items::{PushableIndex, Sitemty, TimeBinnableType}; use netpod::query::{PlainEventsQuery, RawEventsQuery}; use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use serde::de::DeserializeOwned; -use serde_json::Value as JsonValue; use std::fmt::Debug; use std::pin::Pin; use std::time::Duration; -use tokio::time::timeout_at; pub trait ChannelExecFunction { type Output; @@ -316,97 +309,6 @@ impl PlainEventsJson { } } -// TODO rename, it is also used for binned: -pub async fn collect_plain_events_json( - stream: S, - timeout: Duration, - bin_count_exp: u32, - events_max: u64, - do_log: bool, -) -> Result -where - S: Stream> + Unpin, - T: Collectable + Debug, -{ - let deadline = tokio::time::Instant::now() + timeout; - // TODO in general a Collector does not need to know about the expected number of bins. - // It would make more sense for some specific Collector kind to know. - // Therefore introduce finer grained types. - let mut collector = ::new_collector(bin_count_exp); - let mut i1 = 0; - let mut stream = stream; - let mut total_duration = Duration::ZERO; - loop { - let item = if i1 == 0 { - stream.next().await - } else { - if false { - None - } else { - match timeout_at(deadline, stream.next()).await { - Ok(k) => k, - Err(_) => { - collector.set_timed_out(); - None - } - } - } - }; - match item { - Some(item) => { - match item { - Ok(item) => match item { - StreamItem::Log(item) => { - if do_log { - debug!("collect_plain_events_json log {:?}", item); - } - } - StreamItem::Stats(item) => match item { - // TODO factor and simplify the stats collection: - items::StatsItem::EventDataReadStats(_) => {} - items::StatsItem::RangeFilterStats(_) => {} - items::StatsItem::DiskStats(item) => match item { - netpod::DiskStats::OpenStats(k) => { - total_duration += k.duration; - } - netpod::DiskStats::SeekStats(k) => { - total_duration += k.duration; - } - netpod::DiskStats::ReadStats(k) => { - total_duration += k.duration; - } - netpod::DiskStats::ReadExactStats(k) => { - total_duration += k.duration; - } - }, - }, - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - collector.set_range_complete(); - } - RangeCompletableItem::Data(item) => { - collector.ingest(&item); - i1 += 1; - if i1 >= events_max { - break; - } - } - }, - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - Err(e)?; - } - }; - } - None => break, - } - } - let ret = serde_json::to_value(collector.result()?)?; - debug!("Total duration: {:?}", total_duration); - Ok(ret) -} - impl ChannelExecFunction for PlainEventsJson { type Output = Pin> + Send>>; diff --git a/httpret/src/events.rs b/httpret/src/events.rs index f146a24..3fa3135 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -6,8 +6,8 @@ use http::{Method, Request, Response, StatusCode}; use hyper::Body; use items_2::merger::ChannelEventsMerger; use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents}; -use netpod::log::*; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; +use netpod::{log::*, HasBackend}; use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use scyllaconn::create_scy_session; @@ -116,31 +116,35 @@ async fn plain_events_json( let query = query; // --- - let op = disk::channelexec::PlainEventsJson::new( - // TODO pass only the query, not channel, range again: - query.clone(), - query.channel().clone(), - query.range().clone(), - query.timeout(), - node_config.clone(), - query.events_max().unwrap_or(u64::MAX), - query.do_log(), - ); - let s = disk::channelexec::channel_exec( - op, - query.channel(), - query.range(), - chconf.scalar_type, - chconf.shape, - AggKind::Plain, - node_config, - ) - .await?; - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - s.map_err(Error::from), - format!("plain_events_json"), - ))?; - Ok(ret) + if query.backend() == "testbackend" { + err::todoval() + } else { + let op = disk::channelexec::PlainEventsJson::new( + // TODO pass only the query, not channel, range again: + query.clone(), + query.channel().clone(), + query.range().clone(), + query.timeout(), + node_config.clone(), + query.events_max().unwrap_or(u64::MAX), + query.do_log(), + ); + let s = disk::channelexec::channel_exec( + op, + query.channel(), + query.range(), + chconf.scalar_type, + chconf.shape, + AggKind::Plain, + node_config, + ) + .await?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + s.map_err(Error::from), + format!("plain_events_json"), + ))?; + Ok(ret) + } } pub struct EventsHandlerScylla {} @@ -248,7 +252,7 @@ impl EventsHandlerScylla { Ok(k) => match k { ChannelEvents::Events(mut item) => { if coll.is_none() { - coll = Some(item.new_collector(0)); + coll = Some(item.new_collector()); } let cl = coll.as_mut().unwrap(); cl.ingest(item.as_collectable_mut()); diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index f5afcc7..39eda55 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -3,7 +3,6 @@ use crate::err::Error; use crate::response; use bytes::Bytes; use disk::channelexec::channel_exec; -use disk::channelexec::collect_plain_events_json; use disk::channelexec::ChannelExecFunction; use disk::decode::Endianness; use disk::decode::EventValueFromBytes; @@ -16,6 +15,7 @@ use futures_util::TryStreamExt; use http::{Method, StatusCode}; use hyper::{Body, Request, Response}; use items::numops::NumOps; +use items::streams::collect_plain_events_json; use items::streams::Collectable; use items::Clearable; use items::EventsNodeProcessor; diff --git a/items/Cargo.toml b/items/Cargo.toml index 05d0a8e..47e55b2 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" path = "src/items.rs" [dependencies] +tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +futures-util = "0.3.15" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" @@ -17,7 +19,6 @@ bytes = "1.2.1" num-traits = "0.2.15" chrono = { version = "0.4.22", features = ["serde"] } crc32fast = "1.3.2" -tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } err = { path = "../err" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items/src/items.rs b/items/src/items.rs index 3345f87..d70d2b6 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -99,6 +99,10 @@ impl LogItem { pub type Sitemty = Result>, Error>; +pub fn sitem_data(x: X) -> Sitemty { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) +} + struct VisitLevel; impl<'de> Visitor<'de> for VisitLevel { diff --git a/items/src/streams.rs b/items/src/streams.rs index 2c119a0..3494896 100644 --- a/items/src/streams.rs +++ b/items/src/streams.rs @@ -1,6 +1,12 @@ use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen}; use err::Error; +use futures_util::{Stream, StreamExt}; +use netpod::log::*; use serde::Serialize; +use serde_json::Value as JsonValue; +use std::fmt; +use std::time::Duration; +use tokio::time::timeout_at; pub trait Collector: Send + Unpin + WithLen { type Input: Collectable; @@ -45,3 +51,98 @@ impl ToJsonResult for Sitemty { } } } + +// TODO rename, it is also used for binned: +pub async fn collect_plain_events_json( + stream: S, + timeout: Duration, + bin_count_exp: u32, + events_max: u64, + do_log: bool, +) -> Result +where + S: Stream> + Unpin, + T: Collectable + fmt::Debug, +{ + let deadline = tokio::time::Instant::now() + timeout; + // TODO in general a Collector does not need to know about the expected number of bins. + // It would make more sense for some specific Collector kind to know. + // Therefore introduce finer grained types. + let mut collector = ::new_collector(bin_count_exp); + let mut i1 = 0; + let mut stream = stream; + let mut total_duration = Duration::ZERO; + loop { + let item = if i1 == 0 { + stream.next().await + } else { + if false { + None + } else { + match timeout_at(deadline, stream.next()).await { + Ok(k) => k, + Err(_) => { + collector.set_timed_out(); + None + } + } + } + }; + match item { + Some(item) => { + match item { + Ok(item) => match item { + StreamItem::Log(item) => { + if do_log { + debug!("collect_plain_events_json log {:?}", item); + } + } + StreamItem::Stats(item) => { + use crate::StatsItem; + use netpod::DiskStats; + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + total_duration += k.duration; + } + }, + } + } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + collector.set_range_complete(); + } + RangeCompletableItem::Data(item) => { + collector.ingest(&item); + i1 += 1; + if i1 >= events_max { + break; + } + } + }, + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e)?; + } + }; + } + None => break, + } + } + let ret = serde_json::to_value(collector.result()?)?; + debug!("Total duration: {:?}", total_duration); + Ok(ret) +} diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 6743f0b..7053c46 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -231,16 +231,14 @@ pub struct BinsDim0Collector { timed_out: bool, range_complete: bool, vals: BinsDim0, - bin_count_exp: u32, } impl BinsDim0Collector { - pub fn new(bin_count_exp: u32) -> Self { + pub fn new() -> Self { Self { timed_out: false, range_complete: false, vals: BinsDim0::::empty(), - bin_count_exp, } } } @@ -274,11 +272,12 @@ impl CollectorType for BinsDim0Collector { } fn result(&mut self) -> Result { + let bin_count_exp = 0; let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at, finished_at) = if bin_count < self.bin_count_exp { + let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { match self.vals.ts2s.back() { Some(&k) => { - let missing_bins = self.bin_count_exp - bin_count; + let missing_bins = bin_count_exp - bin_count; let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); @@ -323,8 +322,8 @@ impl CollectorType for BinsDim0Collector { impl CollectableType for BinsDim0 { type Collector = BinsDim0Collector; - fn new_collector(bin_count_exp: u32) -> Self::Collector { - Self::Collector::new(bin_count_exp) + fn new_collector() -> Self::Collector { + Self::Collector::new() } } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 0f29c88..344ef57 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -115,17 +115,14 @@ pub struct EventsDim0Collector { vals: EventsDim0, range_complete: bool, timed_out: bool, - #[allow(unused)] - bin_count_exp: u32, } impl EventsDim0Collector { - pub fn new(bin_count_exp: u32) -> Self { + pub fn new() -> Self { Self { vals: EventsDim0::empty(), range_complete: false, timed_out: false, - bin_count_exp, } } } @@ -206,8 +203,8 @@ impl CollectorType for EventsDim0Collector { impl CollectableType for EventsDim0 { type Collector = EventsDim0Collector; - fn new_collector(bin_count_exp: u32) -> Self::Collector { - Self::Collector::new(bin_count_exp) + fn new_collector() -> Self::Collector { + Self::Collector::new() } } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index bb67773..77774e3 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -310,6 +310,71 @@ impl PartialEq for Box { } } +struct EventsCollector {} + +impl WithLen for EventsCollector { + fn len(&self) -> usize { + todo!() + } +} + +impl Collector for EventsCollector { + fn ingest(&mut self, src: &mut dyn Collectable) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn result(&mut self) -> Result, err::Error> { + todo!() + } +} + +impl Collectable for Box { + fn new_collector(&self) -> Box { + Box::new(EventsCollector {}) + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + todo!() + } +} + +/*impl crate::streams::CollectorType for EventsCollector { + type Input = dyn Events; + type Output = Box; + + fn ingest(&mut self, src: &mut Self::Input) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn result(&mut self) -> Result { + todo!() + } +}*/ + +/*impl crate::streams::CollectableType for dyn Events { + type Collector = EventsCollector; + + fn new_collector() -> Self::Collector { + todo!() + } +}*/ + /// Data in time-binned form. pub trait TimeBinned: Any + TimeBinnable { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; @@ -506,6 +571,10 @@ pub enum ChannelEvents { Status(ConnStatusEvent), } +impl FrameTypeInnerStatic for ChannelEvents { + const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; +} + impl Clone for ChannelEvents { fn clone(&self) -> Self { match self { @@ -663,14 +732,10 @@ impl MergableEvents for ChannelEvents { } } -impl FrameTypeInnerStatic for ChannelEvents { - const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; -} - // TODO do this with some blanket impl: impl Collectable for Box { - fn new_collector(&self, bin_count_exp: u32) -> Box { - Collectable::new_collector(self.as_ref(), bin_count_exp) + fn new_collector(&self) -> Box { + Collectable::new_collector(self.as_ref()) } fn as_any_mut(&mut self) -> &mut dyn Any { @@ -681,7 +746,6 @@ impl Collectable for Box { fn flush_binned( binner: &mut Box, coll: &mut Option>, - bin_count_exp: u32, force: bool, ) -> Result<(), Error> { trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); @@ -699,7 +763,7 @@ fn flush_binned( Some(mut ready) => { trace!("binned_collected ready {ready:?}"); if coll.is_none() { - *coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + *coll = Some(ready.as_collectable_mut().new_collector()); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); @@ -728,6 +792,8 @@ pub async fn binned_collected( let ts_edges_max = *edges.last().unwrap(); let deadline = Instant::now() + timeout; let mut did_timeout = false; + // TODO use a trait to allow check of unfinished data [hcn2956jxhwsf] + #[allow(unused)] let bin_count_exp = edges.len().max(2) as u32 - 1; let do_time_weight = agg_kind.do_time_weighted(); // TODO maybe TimeBinner should take all ChannelEvents and handle this? @@ -772,7 +838,7 @@ pub async fn binned_collected( } let binner = binner.as_mut().unwrap(); binner.ingest(events.as_time_binnable()); - flush_binned(binner, &mut coll, bin_count_exp, false)?; + flush_binned(binner, &mut coll, false)?; } } ChannelEvents::Status(item) => { @@ -804,10 +870,10 @@ pub async fn binned_collected( binner.cycle(); } trace!("flush binned"); - flush_binned(&mut binner, &mut coll, bin_count_exp, false)?; + flush_binned(&mut binner, &mut coll, false)?; if coll.is_none() { debug!("force a bin"); - flush_binned(&mut binner, &mut coll, bin_count_exp, true)?; + flush_binned(&mut binner, &mut coll, true)?; } else { trace!("coll is already some"); } diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 73735d8..df929ab 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -23,12 +23,11 @@ pub trait Collector: Send + Unpin + WithLen { pub trait CollectableType { type Collector: CollectorType; - - fn new_collector(bin_count_exp: u32) -> Self::Collector; + fn new_collector() -> Self::Collector; } pub trait Collectable: Any { - fn new_collector(&self, bin_count_exp: u32) -> Box; + fn new_collector(&self) -> Box; fn as_any_mut(&mut self) -> &mut dyn Any; } @@ -53,8 +52,8 @@ impl Collector for T { } impl Collectable for T { - fn new_collector(&self, bin_count_exp: u32) -> Box { - Box::new(T::new_collector(bin_count_exp)) as _ + fn new_collector(&self) -> Box { + Box::new(T::new_collector()) as _ } fn as_any_mut(&mut self) -> &mut dyn Any { @@ -69,11 +68,13 @@ pub trait ToJsonBytes { } // TODO check usage of this trait -pub trait ToJsonResult: fmt::Debug + Send { +pub trait ToJsonResult: erased_serde::Serialize + fmt::Debug + Send { fn to_json_result(&self) -> Result, Error>; fn as_any(&self) -> &dyn Any; } +erased_serde::serialize_trait_object!(ToJsonResult); + impl ToJsonResult for serde_json::Value { fn to_json_result(&self) -> Result, Error> { Ok(Box::new(self.clone())) diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 3da462e..a328714 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -185,6 +185,8 @@ fn bin01() { let mut coll = None; let mut binner = None; let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect(); + // TODO implement continue-at [hcn2956jxhwsf] + #[allow(unused)] let bin_count_exp = (edges.len() - 1) as u32; let do_time_weight = true; while let Some(item) = stream.next().await { @@ -207,7 +209,7 @@ fn bin01() { Some(mut ready) => { eprintln!("ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + coll = Some(ready.as_collectable_mut().new_collector()); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); @@ -240,7 +242,7 @@ fn bin01() { Some(mut ready) => { eprintln!("ready {ready:?}"); if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + coll = Some(ready.as_collectable_mut().new_collector()); } let cl = coll.as_mut().unwrap(); cl.ingest(ready.as_collectable_mut()); diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index c3c1742..70669f1 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -12,8 +12,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" chrono = { version = "0.4.19", features = ["serde"] } -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -tokio-stream = {version = "0.1.5", features = ["fs"]} +tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync"] } +#tokio-stream = {version = "0.1.5", features = ["fs"]} async-channel = "1.6" bytes = "1.0.1" crc32fast = "1.2.1" diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 9955072..8f9156c 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -7,11 +7,11 @@ edition = "2021" [dependencies] tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] } tracing = "0.1.26" -futures-core = "0.3.15" futures-util = "0.3.15" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" +erased-serde = "0.3.23" bincode = "1.3.3" bytes = "1.0.1" arrayref = "0.3.6" @@ -21,5 +21,6 @@ chrono = { version = "0.4.19", features = ["serde"] } err = { path = "../err" } netpod = { path = "../netpod" } items = { path = "../items" } +items_2 = { path = "../items_2" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } diff --git a/streams/src/collect.rs b/streams/src/collect.rs new file mode 100644 index 0000000..6af8faa --- /dev/null +++ b/streams/src/collect.rs @@ -0,0 +1,114 @@ +use err::Error; +use futures_util::{Stream, StreamExt}; +use items::{RangeCompletableItem, Sitemty, StreamItem}; +use items_2::streams::{Collectable, Collector}; +use netpod::log::*; +use serde::Serialize; +use serde_json::Value as JsonValue; +use std::fmt; +use std::time::Duration; + +// This is meant to work with trait object event containers (crate items_2) + +// TODO rename, it is also used for binned: +pub async fn collect_plain_events_json( + stream: S, + timeout: Duration, + events_max: u64, + do_log: bool, +) -> Result +where + S: Stream> + Unpin, + T: Collectable + fmt::Debug, +{ + let deadline = tokio::time::Instant::now() + timeout; + // TODO in general a Collector does not need to know about the expected number of bins. + // It would make more sense for some specific Collector kind to know. + // Therefore introduce finer grained types. + let mut collector: Option> = None; + let mut i1 = 0; + let mut stream = stream; + let mut total_duration = Duration::ZERO; + loop { + let item = if i1 == 0 { + stream.next().await + } else { + if false { + None + } else { + match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(k) => k, + Err(_) => { + eprintln!("TODO [smc3j3rwha732ru8wcnfgi]"); + err::todo(); + //collector.set_timed_out(); + None + } + } + } + }; + match item { + Some(item) => { + match item { + Ok(item) => match item { + StreamItem::Log(item) => { + if do_log { + debug!("collect_plain_events_json log {:?}", item); + } + } + StreamItem::Stats(item) => { + use items::StatsItem; + use netpod::DiskStats; + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + total_duration += k.duration; + } + }, + } + } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + eprintln!("TODO [73jdfcgf947d]"); + err::todo(); + //collector.set_range_complete(); + } + RangeCompletableItem::Data(item) => { + eprintln!("TODO [nx298nu98venusfc8]"); + err::todo(); + //collector.ingest(&item); + i1 += 1; + if i1 >= events_max { + break; + } + } + }, + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e)?; + } + }; + } + None => break, + } + } + let res = collector + .ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))? + .result()?; + let ret = serde_json::to_value(&res)?; + debug!("Total duration: {:?}", total_duration); + Ok(ret) +} diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index ee5d67c..7ff7302 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -5,10 +5,7 @@ use bytes::{Buf, BytesMut}; use err::Error; use futures_util::{Stream, StreamExt}; use items::eventfull::EventFull; -use items::{ - RangeCompletableItem, StatsItem, - StreamItem, WithLen, -}; +use items::{RangeCompletableItem, StatsItem, StreamItem, WithLen}; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::timeunits::SEC; diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index b71fb48..ad7c823 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -1,6 +1,5 @@ use super::inmem::InMemoryFrameAsyncReadStream; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use items::frame::decode_frame; use items::{FrameTypeInnerStatic, Sitemty, StreamItem}; use netpod::log::*; diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 0de1b1d..18a6b5b 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -1,7 +1,6 @@ use bytes::{BufMut, BytesMut}; use err::Error; -use futures_core::Stream; -use futures_util::pin_mut; +use futures_util::{pin_mut, Stream}; use items::inmem::InMemoryFrame; use items::StreamItem; use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 2000417..751ac3f 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -1,7 +1,10 @@ +pub mod collect; pub mod dtflags; pub mod eventchunker; pub mod filechunkread; pub mod frames; +pub mod merge; pub mod needminbuffer; +pub mod plaineventsjson; pub mod rangefilter; pub mod tcprawclient; diff --git a/streams/src/merge.rs b/streams/src/merge.rs new file mode 100644 index 0000000..a83f4f2 --- /dev/null +++ b/streams/src/merge.rs @@ -0,0 +1,50 @@ +// Sets up the raw tcp connections: disk::merge::mergedfromremotes::MergedFromRemotes +// and then sets up a disk::merge::MergedStream + +pub mod mergedstream; + +use crate::frames::eventsfromframes::EventsFromFrames; +use crate::frames::inmem::InMemoryFrameAsyncReadStream; +use err::Error; +use futures_util::Stream; +use items::frame::make_frame; +use items::frame::make_term_frame; +use items::sitem_data; +use items::EventQueryJsonStringFrame; +use items::Sitemty; +use items_2::ChannelEvents; +use netpod::log::*; +use netpod::Cluster; +use std::pin::Pin; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +pub type ChannelEventsBoxedStream = Pin> + Send>>; + +pub async fn open_tcp_streams( + query: &dyn erased_serde::Serialize, + cluster: &Cluster, +) -> Result, Error> { + // TODO when unit tests established, change to async connect: + let mut streams = Vec::new(); + for node in &cluster.nodes { + debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw); + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_string(&query)?; + let (netin, mut netout) = net.into_split(); + let item = EventQueryJsonStringFrame(qjs); + let item = sitem_data(item); + let buf = make_frame(&item)?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + // TODO for images, we need larger buffer capacity + let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128); + type ITEM = ChannelEvents; + let stream = EventsFromFrames::<_, ITEM>::new(frames); + streams.push(Box::pin(stream) as _); + } + Ok(streams) +} diff --git a/streams/src/merge/mergedstream.rs b/streams/src/merge/mergedstream.rs new file mode 100644 index 0000000..9bc5c18 --- /dev/null +++ b/streams/src/merge/mergedstream.rs @@ -0,0 +1,308 @@ +use err::Error; +use futures_util::{Stream, StreamExt}; +use items::ByteEstimate; +use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps}; +use netpod::histo::HistoLog2; +use netpod::log::*; +use netpod::ByteSize; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +// TODO compare with items_2::merger::* + +const LOG_EMIT_ITEM: bool = false; + +enum MergedCurVal { + None, + Finish, + Val(T), +} + +pub struct MergedStream { + inps: Vec, + current: Vec>, + ixs: Vec, + errored: bool, + completed: bool, + batch: Option, + ts_last_emit: u64, + range_complete_observed: Vec, + range_complete_observed_all: bool, + range_complete_observed_all_emitted: bool, + data_emit_complete: bool, + batch_size: ByteSize, + batch_len_emit_histo: HistoLog2, + logitems: VecDeque, + stats_items: VecDeque, +} + +impl Drop for MergedStream { + fn drop(&mut self) { + // TODO collect somewhere + debug!( + "MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}", + self.batch_len_emit_histo + ); + } +} + +impl MergedStream +where + S: Stream> + Unpin, + ITY: Appendable + Unpin, +{ + pub fn new(inps: Vec) -> Self { + trace!("MergedStream::new"); + let n = inps.len(); + let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); + Self { + inps, + current: current, + ixs: vec![0; n], + errored: false, + completed: false, + batch: None, + ts_last_emit: 0, + range_complete_observed: vec![false; n], + range_complete_observed_all: false, + range_complete_observed_all_emitted: false, + data_emit_complete: false, + batch_size: ByteSize::kb(128), + batch_len_emit_histo: HistoLog2::new(0), + logitems: VecDeque::new(), + stats_items: VecDeque::new(), + } + } + + fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut pending = 0; + for i1 in 0..self.inps.len() { + match self.current[i1] { + MergedCurVal::None => { + 'l1: loop { + break match self.inps[i1].poll_next_unpin(cx) { + Ready(Some(Ok(k))) => match k { + StreamItem::Log(item) => { + self.logitems.push_back(item); + continue 'l1; + } + StreamItem::Stats(item) => { + self.stats_items.push_back(item); + continue 'l1; + } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + debug!("MergedStream range_complete d {} COMPLETE", d); + } else { + trace!("MergedStream range_complete d {}", d); + } + continue 'l1; + } + RangeCompletableItem::Data(item) => { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::Val(item); + } + }, + }, + Ready(Some(Err(e))) => { + // TODO emit this error, consider this stream as done, anything more to do here? + //self.current[i1] = CurVal::Err(e); + self.errored = true; + return Ready(Err(e)); + } + Ready(None) => { + self.current[i1] = MergedCurVal::Finish; + } + Pending => { + pending += 1; + } + }; + } + } + _ => (), + } + } + if pending > 0 { + Pending + } else { + Ready(Ok(())) + } + } +} + +impl Stream for MergedStream +where + S: Stream> + Unpin, + ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.logitems.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else if let Some(item) = self.stats_items.pop_front() { + Ready(Some(Ok(StreamItem::Stats(item)))) + } else if self.range_complete_observed_all_emitted { + self.completed = true; + Ready(None) + } else if self.data_emit_complete { + if self.range_complete_observed_all { + if self.range_complete_observed_all_emitted { + self.completed = true; + Ready(None) + } else { + self.range_complete_observed_all_emitted = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + } else { + self.completed = true; + Ready(None) + } + } else { + match self.replenish(cx) { + Ready(Ok(_)) => { + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + if let MergedCurVal::Val(val) = &self.current[i1] { + let u = self.ixs[i1]; + if u >= val.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::None; + continue 'outer; + } else { + let ts = val.ts(u); + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + } + if lowest_ix == usize::MAX { + if let Some(batch) = self.batch.take() { + if batch.len() != 0 { + self.batch_len_emit_histo.ingest(batch.len() as u32); + self.data_emit_complete = true; + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..batch.len() { + aa.push(batch.ts(ii)); + } + debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa); + }; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) + } else { + self.data_emit_complete = true; + continue 'outer; + } + } else { + self.data_emit_complete = true; + continue 'outer; + } + } else { + // TODO unordered cases + if lowest_ts < self.ts_last_emit { + self.errored = true; + let msg = format!( + "unordered event at lowest_ts {} ts_last_emit {}", + lowest_ts, self.ts_last_emit + ); + return Ready(Some(Err(Error::with_public_msg(msg)))); + } else { + self.ts_last_emit = self.ts_last_emit.max(lowest_ts); + } + { + let batch = self.batch.take(); + let rix = self.ixs[lowest_ix]; + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => { + let mut ldst = batch.unwrap_or_else(|| val.empty_like_self()); + if false { + info!( + "Push event rix {} lowest_ix {} lowest_ts {}", + rix, lowest_ix, lowest_ts + ); + } + ldst.push_index(val, rix); + self.batch = Some(ldst); + } + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + } + } + self.ixs[lowest_ix] += 1; + let curlen = match &self.current[lowest_ix] { + MergedCurVal::Val(val) => val.len(), + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + }; + if self.ixs[lowest_ix] >= curlen { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedCurVal::None; + } + let emit_packet_now = if let Some(batch) = &self.batch { + if batch.byte_estimate() >= self.batch_size.bytes() as u64 { + true + } else { + false + } + } else { + false + }; + if emit_packet_now { + if let Some(batch) = self.batch.take() { + trace!("emit item because over threshold len {}", batch.len()); + self.batch_len_emit_histo.ingest(batch.len() as u32); + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..batch.len() { + aa.push(batch.ts(ii)); + } + debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa); + }; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) + } else { + continue 'outer; + } + } else { + continue 'outer; + } + } + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + }; + } + } +} + +#[cfg(test)] +mod test { + use items_2::{ChannelEvents, Empty}; + + #[test] + fn merge_channel_events() { + let mut evs = items_2::eventsdim0::EventsDim0::empty(); + evs.push(1, 100, 17u8); + evs.push(3, 300, 16); + let _cevs = ChannelEvents::Events(Box::new(evs)); + } +} diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs new file mode 100644 index 0000000..2659c29 --- /dev/null +++ b/streams/src/plaineventsjson.rs @@ -0,0 +1,73 @@ +use crate::merge::open_tcp_streams; +use bytes::Bytes; +use err::Error; +use futures_util::{future, stream, FutureExt, Stream, StreamExt}; +use items::streams::collect_plain_events_json; +use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; +use items_2::ChannelEvents; +use netpod::log::*; +use netpod::Cluster; +use serde::Serialize; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct BytesStream(Pin> + Send>>); + +impl Stream for BytesStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + StreamExt::poll_next_unpin(&mut self, cx) + } +} + +pub async fn plain_events_json(query: SER, cluster: &Cluster) -> Result +where + SER: Serialize, +{ + let inps = open_tcp_streams(&query, cluster).await?; + let mut merged = items_2::merger::ChannelEventsMerger::new(inps); + let timeout = Duration::from_millis(2000); + let events_max = 100; + let do_log = false; + let mut coll = None; + while let Some(item) = merged.next().await { + let item = item?; + match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => todo!(), + RangeCompletableItem::Data(item) => match item { + ChannelEvents::Events(mut item) => { + if coll.is_none() { + coll = Some(item.new_collector()); + } + let coll = coll + .as_mut() + .ok_or_else(|| Error::with_msg_no_trace(format!("no collector")))?; + coll.ingest(&mut item); + } + ChannelEvents::Status(_) => todo!(), + }, + }, + StreamItem::Log(item) => { + info!("log {item:?}"); + } + StreamItem::Stats(item) => { + info!("stats {item:?}"); + } + } + } + // TODO compare with + // streams::collect::collect_plain_events_json + // and remove duplicate functionality. + let mut coll = coll.ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))?; + let res = coll.result()?; + // TODO factor the conversion of the result out to a higher level. + // The output of this function should again be collectable, maybe even binnable and otherwise processable. + let js = serde_json::to_vec(&res)?; + let item = sitem_data(Bytes::from(js)); + let stream = stream::once(future::ready(item)); + let stream = BytesStream(Box::pin(stream)); + Ok(stream) +} diff --git a/streams/src/rangefilter.rs b/streams/src/rangefilter.rs index f8a1226..bdcc604 100644 --- a/streams/src/rangefilter.rs +++ b/streams/src/rangefilter.rs @@ -1,6 +1,5 @@ use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{Stream, StreamExt}; use items::StatsItem; use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; use netpod::{log::*, RangeFilterStats}; diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 5a59593..d0e518a 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -8,7 +8,7 @@ to request such data from nodes. use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; -use futures_core::Stream; +use futures_util::Stream; use items::eventfull::EventFull; use items::frame::{make_frame, make_term_frame}; use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};