From 6d82e6e8d4af12b20281cbc8356c3650cf60ec2f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 20 Apr 2021 18:09:32 +0200 Subject: [PATCH] WIP --- disk/src/agg.rs | 19 +++++-- disk/src/cache.rs | 83 ++++++++++++++++++++++++------ disk/src/merge.rs | 128 +++++++++++++++++++++++++++++++++++++++++++++- disk/src/raw.rs | 14 ++--- 4 files changed, 215 insertions(+), 29 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 881da06..b8b8031 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -198,10 +198,21 @@ impl AggregatableXdim1Bin for ValuesDim0 { } pub struct MinMaxAvgScalarEventBatch { - tss: Vec, - mins: Vec, - maxs: Vec, - avgs: Vec, + pub tss: Vec, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, +} + +impl MinMaxAvgScalarEventBatch { + pub fn empty() -> Self { + Self { + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } } impl std::fmt::Debug for MinMaxAvgScalarEventBatch { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 7dbadd1..76ec9ab 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,4 @@ -use crate::agg::MinMaxAvgScalarBinBatch; +use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; use crate::raw::EventsQuery; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; @@ -275,15 +275,7 @@ impl PreBinnedValueStream { self.fut2 = Some(Box::pin(s)); } None => { - error!("NO BETTER GRAN FOUND FOR g {}", g); - error!("TODO see in source cache.rs"); - - // create a client helper in raw.rs which can connect to a given node with parameters - // create tcp service in raw.rs - // set up tcp inputs - // set up merger - // set up T-binning - // save to cache file if input is complete + warn!("no better resolution found for g {}", g); let evq = EventsQuery { channel: self.channel.clone(), range: NanoRange { @@ -292,9 +284,10 @@ impl PreBinnedValueStream { }, agg_kind: self.agg_kind.clone(), }; + let evq = Arc::new(evq); self.fut2 = Some(Box::pin(PreBinnedAssembledFromRemotes::new( evq, - &self.node_config.cluster, + self.node_config.cluster.clone(), ))); } } @@ -426,11 +419,26 @@ impl Stream for PreBinnedValueFetchedStream { } } -pub struct PreBinnedAssembledFromRemotes {} +type T001 = Pin> + Send>>; +type T002 = Pin> + Send>>; +pub struct PreBinnedAssembledFromRemotes { + tcp_establish_futs: Vec, + nodein: Vec>, +} impl PreBinnedAssembledFromRemotes { - pub fn new(evq: EventsQuery, cluster: &Cluster) -> Self { - err::todoval() + pub fn new(evq: Arc, cluster: Arc) -> Self { + let mut tcp_establish_futs = vec![]; + for node in &cluster.nodes { + let f = super::raw::x_processed_stream_from_node(evq.clone(), node.clone()); + let f: T002 = Box::pin(f); + tcp_establish_futs.push(f); + } + let n = tcp_establish_futs.len(); + Self { + tcp_establish_futs, + nodein: (0..n).into_iter().map(|_| None).collect(), + } } } @@ -438,12 +446,46 @@ impl Stream for PreBinnedAssembledFromRemotes { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + info!("PreBinnedAssembledFromRemotes MAIN POLL"); use Poll::*; // TODO this has several stages: // First, establish async all connections. // Then assemble the merge-and-processing-pipeline and pull from there. - err::todoval() + 'outer: loop { + { + let mut pend = false; + let mut c1 = 0; + for i1 in 0..self.tcp_establish_futs.len() { + if self.nodein[i1].is_none() { + let f = &mut self.tcp_establish_futs[i1]; + pin_mut!(f); + info!("tcp_establish_futs POLLING INPUT ESTAB {}", i1); + match f.poll(cx) { + Ready(Ok(k)) => { + info!("ESTABLISHED INPUT {}", i1); + self.nodein[i1] = Some(k); + } + Ready(Err(e)) => return Ready(Some(Err(e))), + Pending => { + pend = true; + } + } + } else { + c1 += 1; + } + } + if pend { + break Pending; + } else { + if c1 == self.tcp_establish_futs.len() { + // TODO set up the merged stream + let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); + super::merge::MergedMinMaxAvgScalarStream::new(inps); + } + } + } + } } } @@ -465,7 +507,14 @@ impl BinnedStream { }) .flatten() .map(|k| { - info!("ITEM {:?}", k); + match k { + Ok(ref k) => { + info!("BinnedStream got good item {:?}", k); + } + Err(_) => { + error!("BinnedStream got error") + } + } k }); Self { inp: Box::pin(inp) } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index f0dcdea..ba7378c 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,4 +1,4 @@ -use crate::agg::{Dim1F32Stream, ValuesDim1}; +use crate::agg::{Dim1F32Stream, MinMaxAvgScalarEventBatch, ValuesDim1}; use crate::EventFull; use err::Error; use futures_core::Stream; @@ -130,3 +130,129 @@ enum CurVal { Err(Error), Val(ValuesDim1), } + +/* + +============== MergedMinMaxAvgScalarStream + +*/ + +pub struct MergedMinMaxAvgScalarStream +where + S: Stream>, +{ + inps: Vec, + current: Vec, + ixs: Vec, + emitted_complete: bool, + batch: MinMaxAvgScalarEventBatch, +} + +impl MergedMinMaxAvgScalarStream +where + S: Stream>, +{ + pub fn new(inps: Vec) -> Self { + let n = inps.len(); + let current = (0..n) + .into_iter() + .map(|_k| MergedMinMaxAvgScalarStreamCurVal::None) + .collect(); + Self { + inps, + current: current, + ixs: vec![0; n], + emitted_complete: false, + batch: MinMaxAvgScalarEventBatch::empty(), + } + } +} + +impl Stream for MergedMinMaxAvgScalarStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + if self.emitted_complete { + panic!("poll on complete stream"); + } + // can only run logic if all streams are either finished, errored or have some current value. + for i1 in 0..self.inps.len() { + match self.current[i1] { + MergedMinMaxAvgScalarStreamCurVal::None => { + match self.inps[i1].poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k); + } + Ready(Some(Err(e))) => { + // TODO emit this error, consider this stream as done, anything more to do here? + //self.current[i1] = CurVal::Err(e); + return Ready(Some(Err(e))); + } + Ready(None) => { + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish; + } + Pending => { + // TODO is this behaviour correct? + return Pending; + } + } + } + _ => (), + } + } + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + match &self.current[i1] { + MergedMinMaxAvgScalarStreamCurVal::Finish => {} + MergedMinMaxAvgScalarStreamCurVal::Val(val) => { + let u = self.ixs[i1]; + if u >= val.tss.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; + continue 'outer; + } else { + let ts = val.tss[u]; + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + _ => panic!(), + } + } + if lowest_ix == usize::MAX { + // TODO all inputs in finished state + break Ready(None); + } else { + info!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); + self.batch.tss.push(lowest_ts); + let rix = self.ixs[lowest_ix]; + let z = match &self.current[lowest_ix] { + MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix]), + _ => panic!(), + }; + self.batch.mins.push(z.0); + self.batch.maxs.push(z.1); + self.batch.avgs.push(z.2); + self.ixs[lowest_ix] += 1; + } + if self.batch.tss.len() >= 64 { + let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + break Ready(Some(Ok(k))); + } + } + } +} + +enum MergedMinMaxAvgScalarStreamCurVal { + None, + Finish, + Val(MinMaxAvgScalarEventBatch), +} diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 9aeaede..292f376 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -24,7 +24,7 @@ use tracing::{debug, error, info, trace, warn}; /** Query parameters to request (optionally) X-processed, but not T-processed events. */ -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventsQuery { pub channel: Channel, pub range: NanoRange, @@ -32,16 +32,16 @@ pub struct EventsQuery { } pub async fn x_processed_stream_from_node( - query: &EventsQuery, - node: &Node, -) -> Result>>>, Error> { + query: Arc, + node: Arc, +) -> Result> + Send>>, Error> { let mut net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_vec(query)?; + let qjs = serde_json::to_vec(query.as_ref())?; net.write_u32_le(qjs.len() as u32).await?; net.write_all(&qjs).await?; net.flush().await?; let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: net }; - let s3: Pin>>> = Box::pin(s2); + let s3: Pin> + Send>> = Box::pin(s2); Ok(s3) } @@ -161,7 +161,7 @@ pub async fn raw_service(node_config: Arc) -> Result<(), Error> { } } -async fn raw_conn_handler(mut stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { +async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { info!("RAW HANDLER for {:?}", addr); let (netin, mut netout) = stream.into_split(); InMemoryFrameAsyncReadStream::new(netin);