This commit is contained in:
Dominik Werder
2022-12-01 22:45:44 +01:00
parent 74af61f7fb
commit 4a250227cd
7 changed files with 76 additions and 21 deletions

View File

@@ -322,6 +322,8 @@ async fn http_service_inner(
h.handle(req, ctx, &node_config).await
} else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) {
h.handle(req, ctx, &node_config).await
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
if req.method() == Method::GET {
Ok(binned(req, ctx, node_config).await?)

View File

@@ -116,3 +116,39 @@ impl Collectable for Box<dyn Events> {
EventsCollector::new(coll)
}
}
#[derive(Debug)]
pub struct TimeBinnedCollector {}
impl Collector for TimeBinnedCollector {
type Input = Box<dyn crate::TimeBinned>;
type Output = Box<dyn Collected>;
fn len(&self) -> usize {
todo!()
}
fn ingest(&mut self, _item: &mut Self::Input) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn result(&mut self) -> Result<Self::Output, Error> {
todo!()
}
}
impl Collectable for Box<dyn crate::TimeBinned> {
type Collector = TimeBinnedCollector;
fn new_collector(&self) -> Self::Collector {
todo!()
}
}

View File

@@ -65,7 +65,6 @@ async fn events_conn_handler_inner_try(
{
match k {
Ok(StreamItem::DataItem(item)) => {
info!("GOT FRAME: {:?}", item);
frames.push(item);
}
Ok(item) => {

View File

@@ -12,3 +12,4 @@ pub mod tcprawclient;
#[cfg(test)]
pub mod test;
pub mod timebin;
pub mod timebinnedjson;

View File

@@ -1,28 +1,12 @@
use crate::tcprawclient::open_tcp_streams;
use bytes::Bytes;
use err::Error;
use futures_util::{Stream, StreamExt};
use items::Sitemty;
#[allow(unused)]
use netpod::log::*;
use netpod::Cluster;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
pub struct BytesStream(Pin<Box<dyn Stream<Item = Sitemty<Bytes>> + Send>>);
impl Stream for BytesStream {
type Item = Sitemty<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
StreamExt::poll_next_unpin(&mut self, cx)
}
}
// TODO remove?
pub async fn plain_events_json<SER>(query: SER, cluster: &Cluster) -> Result<JsonValue, Error>
where
SER: Serialize,
@@ -41,7 +25,7 @@ where
stream
};
let stream = { items_2::merger::Merger::new(inps, 1) };
let deadline = Instant::now() + Duration::from_millis(8000);
let deadline = Instant::now() + Duration::from_millis(3500);
let events_max = 100;
let collected = crate::collect::collect(stream, deadline, events_max).await?;
let jsval = serde_json::to_value(&collected)?;

View File

@@ -1,6 +1,7 @@
use err::Error;
use futures_util::{Future, FutureExt, Stream, StreamExt};
use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem};
//use items_0::{TimeBinnable, TimeBinner};
use items_2::timebin::{TimeBinnable, TimeBinner};
use netpod::log::*;
use std::fmt;
@@ -11,19 +12,19 @@ use std::time::Instant;
#[allow(unused)]
macro_rules! trace2 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => (trace!($($arg)*));
}
#[allow(unused)]
macro_rules! trace3 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => (trace!($($arg)*));
}
#[allow(unused)]
macro_rules! trace4 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
($($arg:tt)*) => (trace!($($arg)*));
}
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;

View File

@@ -0,0 +1,32 @@
use crate::tcprawclient::open_tcp_streams;
use err::Error;
use futures_util::StreamExt;
#[allow(unused)]
use netpod::log::*;
use netpod::Cluster;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::time::{Duration, Instant};
pub async fn timebinned_json<SER>(query: SER, cluster: &Cluster) -> Result<JsonValue, Error>
where
SER: Serialize,
{
// TODO should be able to ask for data-events only, instead of mixed data and status events.
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = { items_2::merger::Merger::new(inps, 1) };
let events_max = 10000;
let do_time_weight = true;
let deadline = Instant::now() + Duration::from_millis(7500);
let stream = Box::pin(stream);
let stream = crate::timebin::TimeBinnedStream::new(stream, Vec::new(), do_time_weight, deadline);
if false {
let mut stream = stream;
let _: Option<items::Sitemty<Box<dyn items_0::TimeBinned>>> = stream.next().await;
panic!()
}
let collected = crate::collect::collect(stream, deadline, events_max).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}