WIP
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use err::Error;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::collect_s::Collectable;
|
||||
@@ -46,28 +47,161 @@ macro_rules! trace4 {
|
||||
pub struct Collect {
|
||||
inp: CollectableStreamBox,
|
||||
deadline: Instant,
|
||||
events_max: u64,
|
||||
range: Option<SeriesRange>,
|
||||
binrange: Option<BinnedRangeEnum>,
|
||||
collector: Option<Box<dyn Collector>>,
|
||||
range_complete: bool,
|
||||
timeout: bool,
|
||||
timer: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
done_input: bool,
|
||||
}
|
||||
|
||||
impl Collect {
|
||||
pub fn new<INP>(inp: INP, deadline: Instant) -> Self
|
||||
pub fn new<INP>(
|
||||
inp: INP,
|
||||
deadline: Instant,
|
||||
events_max: u64,
|
||||
range: Option<SeriesRange>,
|
||||
binrange: Option<BinnedRangeEnum>,
|
||||
) -> Self
|
||||
where
|
||||
INP: CollectableStreamTrait + 'static,
|
||||
{
|
||||
let timer = tokio::time::sleep_until(deadline.into());
|
||||
Self {
|
||||
inp: CollectableStreamBox(Box::pin(inp)),
|
||||
deadline,
|
||||
events_max,
|
||||
range,
|
||||
binrange,
|
||||
collector: None,
|
||||
range_complete: false,
|
||||
timeout: false,
|
||||
timer: Box::pin(timer),
|
||||
done_input: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_item(&mut self, item: Sitemty<Box<dyn Collectable>>) -> Result<(), Error> {
|
||||
match item {
|
||||
Ok(item) => match item {
|
||||
StreamItem::DataItem(item) => match item {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
self.range_complete = true;
|
||||
if let Some(coll) = self.collector.as_mut() {
|
||||
coll.set_range_complete();
|
||||
} else {
|
||||
warn!("collect received RangeComplete but no collector yet");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
RangeCompletableItem::Data(mut item) => {
|
||||
info!("collect sees len {}", item.len());
|
||||
let coll = self.collector.get_or_insert_with(|| item.new_collector());
|
||||
coll.ingest(&mut item);
|
||||
if coll.len() as u64 >= self.events_max {
|
||||
warn!(
|
||||
"TODO compute continue-at reached events_max {} abort",
|
||||
self.events_max
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
StreamItem::Log(item) => {
|
||||
trace!("collect log {:?}", item);
|
||||
Ok(())
|
||||
}
|
||||
StreamItem::Stats(item) => {
|
||||
trace!("collect stats {:?}", item);
|
||||
match item {
|
||||
// TODO factor and simplify the stats collection:
|
||||
StatsItem::EventDataReadStats(_) => {}
|
||||
StatsItem::RangeFilterStats(_) => {}
|
||||
StatsItem::DiskStats(item) => match item {
|
||||
DiskStats::OpenStats(k) => {
|
||||
//total_duration += k.duration;
|
||||
}
|
||||
DiskStats::SeekStats(k) => {
|
||||
//total_duration += k.duration;
|
||||
}
|
||||
DiskStats::ReadStats(k) => {
|
||||
//total_duration += k.duration;
|
||||
}
|
||||
DiskStats::ReadExactStats(k) => {
|
||||
//total_duration += k.duration;
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// TODO Need to use some flags to get good enough error message for remote user.
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Collect {
|
||||
type Output = Sitemty<Box<dyn Collected>>;
|
||||
type Output = Result<Box<dyn Collected>, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
use Poll::*;
|
||||
let span = tracing::span!(Level::INFO, "Collect");
|
||||
let _spg = span.enter();
|
||||
todo!()
|
||||
loop {
|
||||
break if self.done_input {
|
||||
if self.timeout {
|
||||
if let Some(coll) = self.collector.as_mut() {
|
||||
coll.set_timed_out();
|
||||
} else {
|
||||
warn!("collect timeout but no collector yet");
|
||||
}
|
||||
}
|
||||
// TODO use range_final and timeout in result.
|
||||
match self.collector.take() {
|
||||
Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) {
|
||||
Ok(res) => {
|
||||
//info!("collect stats total duration: {:?}", total_duration);
|
||||
Ready(Ok(res))
|
||||
}
|
||||
Err(e) => Ready(Err(e)),
|
||||
},
|
||||
None => {
|
||||
let e = Error::with_msg_no_trace(format!("no result because no collector was created"));
|
||||
error!("{e}");
|
||||
Ready(Err(e))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.timer.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
self.timeout = true;
|
||||
self.done_input = true;
|
||||
continue;
|
||||
}
|
||||
Pending => match self.inp.0.poll_next_unpin(cx) {
|
||||
Ready(Some(item)) => match self.handle_item(item) {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
Ready(Err(e))
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
self.done_input = true;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +214,7 @@ async fn collect_in_span<T, S>(
|
||||
) -> Result<Box<dyn Collected>, Error>
|
||||
where
|
||||
S: Stream<Item = Sitemty<T>> + Unpin,
|
||||
T: Collectable + WithLen + fmt::Debug,
|
||||
T: Collectable,
|
||||
{
|
||||
info!("collect events_max {events_max} deadline {deadline:?}");
|
||||
let mut collector: Option<Box<dyn Collector>> = None;
|
||||
|
||||
@@ -105,10 +105,9 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster:
|
||||
let stream = PlainEventStream::new(stream);
|
||||
let stream = EventsToTimeBinnable::new(stream);
|
||||
let stream = TimeBinnableToCollectable::new(stream);
|
||||
let collected = Collect::new(stream, deadline, events_max, Some(evq.range().clone()), None).await;
|
||||
|
||||
// TODO allow Collect to respect events_max and give range to compute continue-at.
|
||||
//let collected = crate::collect::collect(stream, deadline, events_max, Some(evq.range().clone()), None).await?;
|
||||
let collected = Collect::new(stream, deadline).await;
|
||||
let jsval = serde_json::to_value(&collected)?;
|
||||
Ok(jsval)
|
||||
}
|
||||
|
||||
@@ -1,20 +1,30 @@
|
||||
use crate::collect::Collect;
|
||||
use crate::test::runfut;
|
||||
use crate::transform::EventsToTimeBinnable;
|
||||
use crate::transform::TimeBinnableToCollectable;
|
||||
use err::Error;
|
||||
use futures_util::stream;
|
||||
use items_0::streamitem::sitem_data;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::WithLen;
|
||||
use items_2::eventsdim0::EventsDim0CollectorOutput;
|
||||
use items_2::streams::PlainEventStream;
|
||||
use items_2::testgen::make_some_boxed_d0_f32;
|
||||
use netpod::timeunits::SEC;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn collect_channel_events() -> Result<(), Error> {
|
||||
fn collect_channel_events_00() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
|
||||
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
|
||||
let stream = stream::iter(vec![sitem_data(evs0), sitem_data(evs1)]);
|
||||
let stream = stream::iter(vec![
|
||||
sitem_data(evs0),
|
||||
sitem_data(evs1),
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
|
||||
]);
|
||||
let deadline = Instant::now() + Duration::from_millis(4000);
|
||||
let events_max = 10000;
|
||||
let res = crate::collect::collect(stream, deadline, events_max, None, None).await?;
|
||||
@@ -30,3 +40,32 @@ fn collect_channel_events() -> Result<(), Error> {
|
||||
};
|
||||
runfut(fut)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_channel_events_01() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
|
||||
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
|
||||
let stream = stream::iter(vec![
|
||||
sitem_data(evs0),
|
||||
sitem_data(evs1),
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
|
||||
]);
|
||||
// TODO build like in request code
|
||||
let deadline = Instant::now() + Duration::from_millis(4000);
|
||||
let events_max = 10000;
|
||||
let stream = PlainEventStream::new(stream);
|
||||
let stream = EventsToTimeBinnable::new(stream);
|
||||
let stream = TimeBinnableToCollectable::new(stream);
|
||||
let res = Collect::new(stream, deadline, events_max, None, None).await?;
|
||||
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
|
||||
eprintln!("Great, a match");
|
||||
eprintln!("{res:?}");
|
||||
assert_eq!(res.len(), 40);
|
||||
} else {
|
||||
return Err(Error::with_msg(format!("bad type of collected result")));
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
runfut(fut)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user