diff --git a/disk/src/merge.rs b/disk/src/merge.rs index b6ef449..3f87a13 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,3 +1,5 @@ +pub mod mergedblobsfromremotes; + use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -10,9 +12,6 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub mod mergedblobsfromremotes; -pub mod mergedfromremotes; - const LOG_EMIT_ITEM: bool = false; enum MergedCurVal { diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs deleted file mode 100644 index 0fb8fbb..0000000 --- a/disk/src/merge/mergedfromremotes.rs +++ /dev/null @@ -1,117 +0,0 @@ -use crate::merge::MergedStream; -use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt}; -use items::{Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty}; -use netpod::log::*; -use netpod::query::RawEventsQuery; -use netpod::{Cluster, PerfOpts}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use streams::tcprawclient::x_processed_stream_from_node; - -type T001 = Pin> + Send>>; -type T002 = Pin, Error>> + Send>>; - -pub struct MergedFromRemotes -where - ENP: EventsNodeProcessor, -{ - tcp_establish_futs: Vec::Output>>, - nodein: Vec::Output>>>, - merged: Option::Output>>, - completed: bool, - errored: bool, -} - -impl MergedFromRemotes -where - ENP: EventsNodeProcessor + 'static, - ::Output: Unpin + PushableIndex + Appendable + Clearable + 'static, - Sitemty<::Output>: FrameType, -{ - pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { - debug!("MergedFromRemotes evq {:?}", evq); - let mut tcp_establish_futs = vec![]; - for node in &cluster.nodes { - let f = x_processed_stream_from_node::(evq.clone(), perf_opts.clone(), node.clone()); - let f: T002<::Output> = Box::pin(f); - tcp_establish_futs.push(f); - } - let n = tcp_establish_futs.len(); - Self { - tcp_establish_futs, - nodein: (0..n).into_iter().map(|_| None).collect(), - merged: None, - completed: false, - errored: false, - } - } -} - -impl Stream for MergedFromRemotes -where - ENP: EventsNodeProcessor + 'static, - ::Output: PushableIndex + Appendable + Clearable, -{ - type Item = Sitemty<::Output>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if let Some(fut) = &mut self.merged { - match fut.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => Ready(Some(Ok(k))), - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } else { - let mut pend = false; - let mut c1 = 0; - for i1 in 0..self.tcp_establish_futs.len() { - if self.nodein[i1].is_none() { - let f = &mut self.tcp_establish_futs[i1]; - pin_mut!(f); - match f.poll(cx) { - Ready(Ok(k)) => { - self.nodein[i1] = Some(k); - } - Ready(Err(e)) => { - self.errored = true; - return Ready(Some(Err(e))); - } - Pending => { - pend = true; - } - } - } else { - c1 += 1; - } - } - if pend { - Pending - } else { - if c1 == self.tcp_establish_futs.len() { - let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::<_, ::Output>::new(inps); - self.merged = Some(Box::pin(s1)); - } - continue 'outer; - } - }; - } - } -}