WIP refactor data event pipeline

This commit is contained in:
Dominik Werder
2022-11-15 16:16:16 +01:00
parent fb78f1887e
commit eebf8665ce
24 changed files with 800 additions and 180 deletions

View File

@@ -7,7 +7,7 @@ pub mod query;
use crate::agg::binnedt::TBinnerStream;
use crate::binned::binnedfrompbv::BinnedFromPreBinned;
use crate::binnedstream::BoxedStream;
use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction};
use crate::channelexec::{channel_exec, ChannelExecFunction};
use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes};
use crate::merge::mergedfromremotes::MergedFromRemotes;
use bytes::Bytes;
@@ -15,7 +15,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::NumOps;
use items::streams::{Collectable, Collector};
use items::streams::{collect_plain_events_json, Collectable, Collector};
use items::{
Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameDecodable, FrameType, PushableIndex,
RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, WithLen,

View File

@@ -1,8 +1,6 @@
use crate::agg::enp::Identity;
use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes,
};
use crate::decode::{BigEndian, Endianness, EventValueFromBytes, EventValueShape, LittleEndian, NumFromBytes};
use crate::decode::{EventValuesDim0Case, EventValuesDim1Case};
use crate::merge::mergedfromremotes::MergedFromRemotes;
use bytes::Bytes;
use err::Error;
@@ -11,20 +9,15 @@ use futures_util::future::FutureExt;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps, StringNum};
use items::scalarevents::ScalarEvents;
use items::streams::{Collectable, Collector};
use items::{
Clearable, EventsNodeProcessor, Framable, FrameType, FrameTypeStatic, PushableIndex, RangeCompletableItem, Sitemty,
StreamItem, TimeBinnableType,
};
use netpod::log::*;
use items::streams::{collect_plain_events_json, Collectable};
use items::{Clearable, EventsNodeProcessor, Framable, FrameType, FrameTypeStatic};
use items::{PushableIndex, Sitemty, TimeBinnableType};
use netpod::query::{PlainEventsQuery, RawEventsQuery};
use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::fmt::Debug;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::timeout_at;
pub trait ChannelExecFunction {
type Output;
@@ -316,97 +309,6 @@ impl PlainEventsJson {
}
}
// TODO rename, it is also used for binned:
pub async fn collect_plain_events_json<T, S>(
stream: S,
timeout: Duration,
bin_count_exp: u32,
events_max: u64,
do_log: bool,
) -> Result<JsonValue, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + Debug,
{
let deadline = tokio::time::Instant::now() + timeout;
// TODO in general a Collector does not need to know about the expected number of bins.
// It would make more sense for some specific Collector kind to know.
// Therefore introduce finer grained types.
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
let mut i1 = 0;
let mut stream = stream;
let mut total_duration = Duration::ZERO;
loop {
let item = if i1 == 0 {
stream.next().await
} else {
if false {
None
} else {
match timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
collector.set_timed_out();
None
}
}
}
};
match item {
Some(item) => {
match item {
Ok(item) => match item {
StreamItem::Log(item) => {
if do_log {
debug!("collect_plain_events_json log {:?}", item);
}
}
StreamItem::Stats(item) => match item {
// TODO factor and simplify the stats collection:
items::StatsItem::EventDataReadStats(_) => {}
items::StatsItem::RangeFilterStats(_) => {}
items::StatsItem::DiskStats(item) => match item {
netpod::DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
netpod::DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
netpod::DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
netpod::DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
},
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
collector.set_range_complete();
}
RangeCompletableItem::Data(item) => {
collector.ingest(&item);
i1 += 1;
if i1 >= events_max {
break;
}
}
},
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
Err(e)?;
}
};
}
None => break,
}
}
let ret = serde_json::to_value(collector.result()?)?;
debug!("Total duration: {:?}", total_duration);
Ok(ret)
}
impl ChannelExecFunction for PlainEventsJson {
type Output = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;

View File

@@ -6,8 +6,8 @@ use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use items_2::merger::ChannelEventsMerger;
use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents};
use netpod::log::*;
use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery};
use netpod::{log::*, HasBackend};
use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
use scyllaconn::create_scy_session;
@@ -116,31 +116,35 @@ async fn plain_events_json(
let query = query;
// ---
let op = disk::channelexec::PlainEventsJson::new(
// TODO pass only the query, not channel, range again:
query.clone(),
query.channel().clone(),
query.range().clone(),
query.timeout(),
node_config.clone(),
query.events_max().unwrap_or(u64::MAX),
query.do_log(),
);
let s = disk::channelexec::channel_exec(
op,
query.channel(),
query.range(),
chconf.scalar_type,
chconf.shape,
AggKind::Plain,
node_config,
)
.await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
s.map_err(Error::from),
format!("plain_events_json"),
))?;
Ok(ret)
if query.backend() == "testbackend" {
err::todoval()
} else {
let op = disk::channelexec::PlainEventsJson::new(
// TODO pass only the query, not channel, range again:
query.clone(),
query.channel().clone(),
query.range().clone(),
query.timeout(),
node_config.clone(),
query.events_max().unwrap_or(u64::MAX),
query.do_log(),
);
let s = disk::channelexec::channel_exec(
op,
query.channel(),
query.range(),
chconf.scalar_type,
chconf.shape,
AggKind::Plain,
node_config,
)
.await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
s.map_err(Error::from),
format!("plain_events_json"),
))?;
Ok(ret)
}
}
pub struct EventsHandlerScylla {}
@@ -248,7 +252,7 @@ impl EventsHandlerScylla {
Ok(k) => match k {
ChannelEvents::Events(mut item) => {
if coll.is_none() {
coll = Some(item.new_collector(0));
coll = Some(item.new_collector());
}
let cl = coll.as_mut().unwrap();
cl.ingest(item.as_collectable_mut());

View File

@@ -3,7 +3,6 @@ use crate::err::Error;
use crate::response;
use bytes::Bytes;
use disk::channelexec::channel_exec;
use disk::channelexec::collect_plain_events_json;
use disk::channelexec::ChannelExecFunction;
use disk::decode::Endianness;
use disk::decode::EventValueFromBytes;
@@ -16,6 +15,7 @@ use futures_util::TryStreamExt;
use http::{Method, StatusCode};
use hyper::{Body, Request, Response};
use items::numops::NumOps;
use items::streams::collect_plain_events_json;
use items::streams::Collectable;
use items::Clearable;
use items::EventsNodeProcessor;

View File

@@ -8,6 +8,8 @@ edition = "2021"
path = "src/items.rs"
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
@@ -17,7 +19,6 @@ bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.22", features = ["serde"] }
crc32fast = "1.3.2"
tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
err = { path = "../err" }
items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" }

View File

@@ -99,6 +99,10 @@ impl LogItem {
pub type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
pub fn sitem_data<X>(x: X) -> Sitemty<X> {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
struct VisitLevel;
impl<'de> Visitor<'de> for VisitLevel {

View File

@@ -1,6 +1,12 @@
use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen};
use err::Error;
use futures_util::{Stream, StreamExt};
use netpod::log::*;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::fmt;
use std::time::Duration;
use tokio::time::timeout_at;
pub trait Collector: Send + Unpin + WithLen {
type Input: Collectable;
@@ -45,3 +51,98 @@ impl ToJsonResult for Sitemty<serde_json::Value> {
}
}
}
// TODO rename, it is also used for binned:
pub async fn collect_plain_events_json<T, S>(
stream: S,
timeout: Duration,
bin_count_exp: u32,
events_max: u64,
do_log: bool,
) -> Result<JsonValue, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + fmt::Debug,
{
let deadline = tokio::time::Instant::now() + timeout;
// TODO in general a Collector does not need to know about the expected number of bins.
// It would make more sense for some specific Collector kind to know.
// Therefore introduce finer grained types.
let mut collector = <T as Collectable>::new_collector(bin_count_exp);
let mut i1 = 0;
let mut stream = stream;
let mut total_duration = Duration::ZERO;
loop {
let item = if i1 == 0 {
stream.next().await
} else {
if false {
None
} else {
match timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
collector.set_timed_out();
None
}
}
}
};
match item {
Some(item) => {
match item {
Ok(item) => match item {
StreamItem::Log(item) => {
if do_log {
debug!("collect_plain_events_json log {:?}", item);
}
}
StreamItem::Stats(item) => {
use crate::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
collector.set_range_complete();
}
RangeCompletableItem::Data(item) => {
collector.ingest(&item);
i1 += 1;
if i1 >= events_max {
break;
}
}
},
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
Err(e)?;
}
};
}
None => break,
}
}
let ret = serde_json::to_value(collector.result()?)?;
debug!("Total duration: {:?}", total_duration);
Ok(ret)
}

View File

@@ -231,16 +231,14 @@ pub struct BinsDim0Collector<NTY> {
timed_out: bool,
range_complete: bool,
vals: BinsDim0<NTY>,
bin_count_exp: u32,
}
impl<NTY> BinsDim0Collector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
pub fn new() -> Self {
Self {
timed_out: false,
range_complete: false,
vals: BinsDim0::<NTY>::empty(),
bin_count_exp,
}
}
}
@@ -274,11 +272,12 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
}
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count_exp = 0;
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < self.bin_count_exp {
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = self.bin_count_exp - bin_count;
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
@@ -323,8 +322,8 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
impl<NTY: ScalarOps> CollectableType for BinsDim0<NTY> {
type Collector = BinsDim0Collector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}

View File

@@ -115,17 +115,14 @@ pub struct EventsDim0Collector<NTY> {
vals: EventsDim0<NTY>,
range_complete: bool,
timed_out: bool,
#[allow(unused)]
bin_count_exp: u32,
}
impl<NTY> EventsDim0Collector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
pub fn new() -> Self {
Self {
vals: EventsDim0::empty(),
range_complete: false,
timed_out: false,
bin_count_exp,
}
}
}
@@ -206,8 +203,8 @@ impl<NTY: ScalarOps> CollectorType for EventsDim0Collector<NTY> {
impl<NTY: ScalarOps> CollectableType for EventsDim0<NTY> {
type Collector = EventsDim0Collector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}

View File

@@ -310,6 +310,71 @@ impl PartialEq for Box<dyn Events> {
}
}
struct EventsCollector {}
impl WithLen for EventsCollector {
fn len(&self) -> usize {
todo!()
}
}
impl Collector for EventsCollector {
fn ingest(&mut self, src: &mut dyn Collectable) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, err::Error> {
todo!()
}
}
impl Collectable for Box<dyn Events> {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(EventsCollector {})
}
fn as_any_mut(&mut self) -> &mut dyn Any {
todo!()
}
}
/*impl crate::streams::CollectorType for EventsCollector {
type Input = dyn Events;
type Output = Box<dyn Events>;
fn ingest(&mut self, src: &mut Self::Input) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn result(&mut self) -> Result<Self::Output, err::Error> {
todo!()
}
}*/
/*impl crate::streams::CollectableType for dyn Events {
type Collector = EventsCollector;
fn new_collector() -> Self::Collector {
todo!()
}
}*/
/// Data in time-binned form.
pub trait TimeBinned: Any + TimeBinnable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
@@ -506,6 +571,10 @@ pub enum ChannelEvents {
Status(ConnStatusEvent),
}
impl FrameTypeInnerStatic for ChannelEvents {
const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
}
impl Clone for ChannelEvents {
fn clone(&self) -> Self {
match self {
@@ -663,14 +732,10 @@ impl MergableEvents for ChannelEvents {
}
}
impl FrameTypeInnerStatic for ChannelEvents {
const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
}
// TODO do this with some blanket impl:
impl Collectable for Box<dyn Collectable> {
fn new_collector(&self, bin_count_exp: u32) -> Box<dyn streams::Collector> {
Collectable::new_collector(self.as_ref(), bin_count_exp)
fn new_collector(&self) -> Box<dyn streams::Collector> {
Collectable::new_collector(self.as_ref())
}
fn as_any_mut(&mut self) -> &mut dyn Any {
@@ -681,7 +746,6 @@ impl Collectable for Box<dyn Collectable> {
fn flush_binned(
binner: &mut Box<dyn TimeBinner>,
coll: &mut Option<Box<dyn Collector>>,
bin_count_exp: u32,
force: bool,
) -> Result<(), Error> {
trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count());
@@ -699,7 +763,7 @@ fn flush_binned(
Some(mut ready) => {
trace!("binned_collected ready {ready:?}");
if coll.is_none() {
*coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
*coll = Some(ready.as_collectable_mut().new_collector());
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
@@ -728,6 +792,8 @@ pub async fn binned_collected(
let ts_edges_max = *edges.last().unwrap();
let deadline = Instant::now() + timeout;
let mut did_timeout = false;
// TODO use a trait to allow check of unfinished data [hcn2956jxhwsf]
#[allow(unused)]
let bin_count_exp = edges.len().max(2) as u32 - 1;
let do_time_weight = agg_kind.do_time_weighted();
// TODO maybe TimeBinner should take all ChannelEvents and handle this?
@@ -772,7 +838,7 @@ pub async fn binned_collected(
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
flush_binned(binner, &mut coll, bin_count_exp, false)?;
flush_binned(binner, &mut coll, false)?;
}
}
ChannelEvents::Status(item) => {
@@ -804,10 +870,10 @@ pub async fn binned_collected(
binner.cycle();
}
trace!("flush binned");
flush_binned(&mut binner, &mut coll, bin_count_exp, false)?;
flush_binned(&mut binner, &mut coll, false)?;
if coll.is_none() {
debug!("force a bin");
flush_binned(&mut binner, &mut coll, bin_count_exp, true)?;
flush_binned(&mut binner, &mut coll, true)?;
} else {
trace!("coll is already some");
}

View File

@@ -23,12 +23,11 @@ pub trait Collector: Send + Unpin + WithLen {
pub trait CollectableType {
type Collector: CollectorType<Input = Self>;
fn new_collector(bin_count_exp: u32) -> Self::Collector;
fn new_collector() -> Self::Collector;
}
pub trait Collectable: Any {
fn new_collector(&self, bin_count_exp: u32) -> Box<dyn Collector>;
fn new_collector(&self) -> Box<dyn Collector>;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
@@ -53,8 +52,8 @@ impl<T: CollectorType + 'static> Collector for T {
}
impl<T: CollectableType + 'static> Collectable for T {
fn new_collector(&self, bin_count_exp: u32) -> Box<dyn Collector> {
Box::new(T::new_collector(bin_count_exp)) as _
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(T::new_collector()) as _
}
fn as_any_mut(&mut self) -> &mut dyn Any {
@@ -69,11 +68,13 @@ pub trait ToJsonBytes {
}
// TODO check usage of this trait
pub trait ToJsonResult: fmt::Debug + Send {
pub trait ToJsonResult: erased_serde::Serialize + fmt::Debug + Send {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error>;
fn as_any(&self) -> &dyn Any;
}
erased_serde::serialize_trait_object!(ToJsonResult);
impl ToJsonResult for serde_json::Value {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
Ok(Box::new(self.clone()))

View File

@@ -185,6 +185,8 @@ fn bin01() {
let mut coll = None;
let mut binner = None;
let edges: Vec<_> = (0..10).into_iter().map(|t| SEC * 10 * t).collect();
// TODO implement continue-at [hcn2956jxhwsf]
#[allow(unused)]
let bin_count_exp = (edges.len() - 1) as u32;
let do_time_weight = true;
while let Some(item) = stream.next().await {
@@ -207,7 +209,7 @@ fn bin01() {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
coll = Some(ready.as_collectable_mut().new_collector());
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
@@ -240,7 +242,7 @@ fn bin01() {
Some(mut ready) => {
eprintln!("ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
coll = Some(ready.as_collectable_mut().new_collector());
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());

View File

@@ -12,8 +12,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
chrono = { version = "0.4.19", features = ["serde"] }
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio-stream = {version = "0.1.5", features = ["fs"]}
tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync"] }
#tokio-stream = {version = "0.1.5", features = ["fs"]}
async-channel = "1.6"
bytes = "1.0.1"
crc32fast = "1.2.1"

View File

@@ -7,11 +7,11 @@ edition = "2021"
[dependencies]
tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.26"
futures-core = "0.3.15"
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
erased-serde = "0.3.23"
bincode = "1.3.3"
bytes = "1.0.1"
arrayref = "0.3.6"
@@ -21,5 +21,6 @@ chrono = { version = "0.4.19", features = ["serde"] }
err = { path = "../err" }
netpod = { path = "../netpod" }
items = { path = "../items" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
bitshuffle = { path = "../bitshuffle" }

114
streams/src/collect.rs Normal file
View File

@@ -0,0 +1,114 @@
use err::Error;
use futures_util::{Stream, StreamExt};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use items_2::streams::{Collectable, Collector};
use netpod::log::*;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::fmt;
use std::time::Duration;
// This is meant to work with trait object event containers (crate items_2)
// TODO rename, it is also used for binned:
pub async fn collect_plain_events_json<T, S>(
stream: S,
timeout: Duration,
events_max: u64,
do_log: bool,
) -> Result<JsonValue, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + fmt::Debug,
{
let deadline = tokio::time::Instant::now() + timeout;
// TODO in general a Collector does not need to know about the expected number of bins.
// It would make more sense for some specific Collector kind to know.
// Therefore introduce finer grained types.
let mut collector: Option<Box<dyn Collector>> = None;
let mut i1 = 0;
let mut stream = stream;
let mut total_duration = Duration::ZERO;
loop {
let item = if i1 == 0 {
stream.next().await
} else {
if false {
None
} else {
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(k) => k,
Err(_) => {
eprintln!("TODO [smc3j3rwha732ru8wcnfgi]");
err::todo();
//collector.set_timed_out();
None
}
}
}
};
match item {
Some(item) => {
match item {
Ok(item) => match item {
StreamItem::Log(item) => {
if do_log {
debug!("collect_plain_events_json log {:?}", item);
}
}
StreamItem::Stats(item) => {
use items::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
eprintln!("TODO [73jdfcgf947d]");
err::todo();
//collector.set_range_complete();
}
RangeCompletableItem::Data(item) => {
eprintln!("TODO [nx298nu98venusfc8]");
err::todo();
//collector.ingest(&item);
i1 += 1;
if i1 >= events_max {
break;
}
}
},
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
Err(e)?;
}
};
}
None => break,
}
}
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))?
.result()?;
let ret = serde_json::to_value(&res)?;
debug!("Total duration: {:?}", total_duration);
Ok(ret)
}

View File

@@ -5,10 +5,7 @@ use bytes::{Buf, BytesMut};
use err::Error;
use futures_util::{Stream, StreamExt};
use items::eventfull::EventFull;
use items::{
RangeCompletableItem, StatsItem,
StreamItem, WithLen,
};
use items::{RangeCompletableItem, StatsItem, StreamItem, WithLen};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::timeunits::SEC;

View File

@@ -1,6 +1,5 @@
use super::inmem::InMemoryFrameAsyncReadStream;
use futures_core::Stream;
use futures_util::StreamExt;
use futures_util::{Stream, StreamExt};
use items::frame::decode_frame;
use items::{FrameTypeInnerStatic, Sitemty, StreamItem};
use netpod::log::*;

View File

@@ -1,7 +1,6 @@
use bytes::{BufMut, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::pin_mut;
use futures_util::{pin_mut, Stream};
use items::inmem::InMemoryFrame;
use items::StreamItem;
use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};

View File

@@ -1,7 +1,10 @@
pub mod collect;
pub mod dtflags;
pub mod eventchunker;
pub mod filechunkread;
pub mod frames;
pub mod merge;
pub mod needminbuffer;
pub mod plaineventsjson;
pub mod rangefilter;
pub mod tcprawclient;

50
streams/src/merge.rs Normal file
View File

@@ -0,0 +1,50 @@
// Sets up the raw tcp connections: disk::merge::mergedfromremotes::MergedFromRemotes
// and then sets up a disk::merge::MergedStream
pub mod mergedstream;
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::Stream;
use items::frame::make_frame;
use items::frame::make_term_frame;
use items::sitem_data;
use items::EventQueryJsonStringFrame;
use items::Sitemty;
use items_2::ChannelEvents;
use netpod::log::*;
use netpod::Cluster;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub type ChannelEventsBoxedStream = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
pub async fn open_tcp_streams(
query: &dyn erased_serde::Serialize,
cluster: &Cluster,
) -> Result<Vec<ChannelEventsBoxedStream>, Error> {
// TODO when unit tests established, change to async connect:
let mut streams = Vec::new();
for node in &cluster.nodes {
debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = EventQueryJsonStringFrame(qjs);
let item = sitem_data(item);
let buf = make_frame(&item)?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128);
type ITEM = ChannelEvents;
let stream = EventsFromFrames::<_, ITEM>::new(frames);
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}

View File

@@ -0,0 +1,308 @@
use err::Error;
use futures_util::{Stream, StreamExt};
use items::ByteEstimate;
use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
// TODO compare with items_2::merger::*
const LOG_EMIT_ITEM: bool = false;
enum MergedCurVal<T> {
None,
Finish,
Val(T),
}
pub struct MergedStream<S, ITY> {
inps: Vec<S>,
current: Vec<MergedCurVal<ITY>>,
ixs: Vec<usize>,
errored: bool,
completed: bool,
batch: Option<ITY>,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
logitems: VecDeque<LogItem>,
stats_items: VecDeque<StatsItem>,
}
impl<S, ITY> Drop for MergedStream<S, ITY> {
fn drop(&mut self) {
// TODO collect somewhere
debug!(
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
self.batch_len_emit_histo
);
}
}
impl<S, ITY> MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Appendable + Unpin,
{
pub fn new(inps: Vec<S>) -> Self {
trace!("MergedStream::new");
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
inps,
current: current,
ixs: vec![0; n],
errored: false,
completed: false,
batch: None,
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
logitems: VecDeque::new(),
stats_items: VecDeque::new(),
}
}
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
let mut pending = 0;
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedCurVal::None => {
'l1: loop {
break match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => match k {
StreamItem::Log(item) => {
self.logitems.push_back(item);
continue 'l1;
}
StreamItem::Stats(item) => {
self.stats_items.push_back(item);
continue 'l1;
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
debug!("MergedStream range_complete d {} COMPLETE", d);
} else {
trace!("MergedStream range_complete d {}", d);
}
continue 'l1;
}
RangeCompletableItem::Data(item) => {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::Val(item);
}
},
},
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
self.errored = true;
return Ready(Err(e));
}
Ready(None) => {
self.current[i1] = MergedCurVal::Finish;
}
Pending => {
pending += 1;
}
};
}
}
_ => (),
}
}
if pending > 0 {
Pending
} else {
Ready(Ok(()))
}
}
}
impl<S, ITY> Stream for MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin,
{
type Item = Sitemty<ITY>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if let Some(item) = self.logitems.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(item) = self.stats_items.pop_front() {
Ready(Some(Ok(StreamItem::Stats(item))))
} else if self.range_complete_observed_all_emitted {
self.completed = true;
Ready(None)
} else if self.data_emit_complete {
if self.range_complete_observed_all {
if self.range_complete_observed_all_emitted {
self.completed = true;
Ready(None)
} else {
self.range_complete_observed_all_emitted = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
} else {
self.completed = true;
Ready(None)
}
} else {
match self.replenish(cx) {
Ready(Ok(_)) => {
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
if let MergedCurVal::Val(val) = &self.current[i1] {
let u = self.ixs[i1];
if u >= val.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::None;
continue 'outer;
} else {
let ts = val.ts(u);
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
}
if lowest_ix == usize::MAX {
if let Some(batch) = self.batch.take() {
if batch.len() != 0 {
self.batch_len_emit_histo.ingest(batch.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..batch.len() {
aa.push(batch.ts(ii));
}
debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
// TODO unordered cases
if lowest_ts < self.ts_last_emit {
self.errored = true;
let msg = format!(
"unordered event at lowest_ts {} ts_last_emit {}",
lowest_ts, self.ts_last_emit
);
return Ready(Some(Err(Error::with_public_msg(msg))));
} else {
self.ts_last_emit = self.ts_last_emit.max(lowest_ts);
}
{
let batch = self.batch.take();
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
let mut ldst = batch.unwrap_or_else(|| val.empty_like_self());
if false {
info!(
"Push event rix {} lowest_ix {} lowest_ts {}",
rix, lowest_ix, lowest_ts
);
}
ldst.push_index(val, rix);
self.batch = Some(ldst);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
}
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
MergedCurVal::Val(val) => val.len(),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
};
if self.ixs[lowest_ix] >= curlen {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
let emit_packet_now = if let Some(batch) = &self.batch {
if batch.byte_estimate() >= self.batch_size.bytes() as u64 {
true
} else {
false
}
} else {
false
};
if emit_packet_now {
if let Some(batch) = self.batch.take() {
trace!("emit item because over threshold len {}", batch.len());
self.batch_len_emit_histo.ingest(batch.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..batch.len() {
aa.push(batch.ts(ii));
}
debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
} else {
continue 'outer;
}
} else {
continue 'outer;
}
}
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
};
}
}
}
#[cfg(test)]
mod test {
use items_2::{ChannelEvents, Empty};
#[test]
fn merge_channel_events() {
let mut evs = items_2::eventsdim0::EventsDim0::empty();
evs.push(1, 100, 17u8);
evs.push(3, 300, 16);
let _cevs = ChannelEvents::Events(Box::new(evs));
}
}

View File

@@ -0,0 +1,73 @@
use crate::merge::open_tcp_streams;
use bytes::Bytes;
use err::Error;
use futures_util::{future, stream, FutureExt, Stream, StreamExt};
use items::streams::collect_plain_events_json;
use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem};
use items_2::ChannelEvents;
use netpod::log::*;
use netpod::Cluster;
use serde::Serialize;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub struct BytesStream(Pin<Box<dyn Stream<Item = Sitemty<Bytes>> + Send>>);
impl Stream for BytesStream {
type Item = Sitemty<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
StreamExt::poll_next_unpin(&mut self, cx)
}
}
pub async fn plain_events_json<SER>(query: SER, cluster: &Cluster) -> Result<BytesStream, Error>
where
SER: Serialize,
{
let inps = open_tcp_streams(&query, cluster).await?;
let mut merged = items_2::merger::ChannelEventsMerger::new(inps);
let timeout = Duration::from_millis(2000);
let events_max = 100;
let do_log = false;
let mut coll = None;
while let Some(item) = merged.next().await {
let item = item?;
match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => todo!(),
RangeCompletableItem::Data(item) => match item {
ChannelEvents::Events(mut item) => {
if coll.is_none() {
coll = Some(item.new_collector());
}
let coll = coll
.as_mut()
.ok_or_else(|| Error::with_msg_no_trace(format!("no collector")))?;
coll.ingest(&mut item);
}
ChannelEvents::Status(_) => todo!(),
},
},
StreamItem::Log(item) => {
info!("log {item:?}");
}
StreamItem::Stats(item) => {
info!("stats {item:?}");
}
}
}
// TODO compare with
// streams::collect::collect_plain_events_json
// and remove duplicate functionality.
let mut coll = coll.ok_or_else(|| Error::with_msg_no_trace(format!("no collector created")))?;
let res = coll.result()?;
// TODO factor the conversion of the result out to a higher level.
// The output of this function should again be collectable, maybe even binnable and otherwise processable.
let js = serde_json::to_vec(&res)?;
let item = sitem_data(Bytes::from(js));
let stream = stream::once(future::ready(item));
let stream = BytesStream(Box::pin(stream));
Ok(stream)
}

View File

@@ -1,6 +1,5 @@
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use futures_util::{Stream, StreamExt};
use items::StatsItem;
use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps};
use netpod::{log::*, RangeFilterStats};

View File

@@ -8,7 +8,7 @@ to request such data from nodes.
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_core::Stream;
use futures_util::Stream;
use items::eventfull::EventFull;
use items::frame::{make_frame, make_term_frame};
use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};