diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 492360a..78b41c1 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -537,13 +537,8 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { } } -pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static -// TODO would it be better to express it here? -//where Result>, Error>: FrameType, -{ - type TBinnedStreamType: Stream>, Error>> - + Send - + 'static; +pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { + type TBinnedStreamType: Stream>, Error>> + Send; type XBinnedEvents: XBinnedEvents; type TBinnedBins: TBinnedBins; type XBinnedToTBinnedAggregator; @@ -633,7 +628,6 @@ impl BinnedStreamKind for BinnedStreamKindScalar { ) -> Result { let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); let s = Self::xbinned_to_tbinned(s, range); - //let s = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s, range); Ok(BoxedStream::new(Box::pin(s))?) } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index feec921..23dd52a 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -12,23 +12,23 @@ use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct BinnedScalarStreamFromPreBinnedPatches +pub struct BinnedScalarStreamFromPreBinnedPatches where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { inp: Pin< Box< - dyn Stream::TBinnedBins>>, Error>> + dyn Stream::TBinnedBins>>, Error>> + Send, >, >, - _stream_kind: BK, + _stream_kind: SK, } -impl BinnedScalarStreamFromPreBinnedPatches +impl BinnedScalarStreamFromPreBinnedPatches where - BK: BinnedStreamKind, - Result::TBinnedBins>>, Error>: FrameType, + SK: BinnedStreamKind, + Result::TBinnedBins>>, Error>: FrameType, { pub fn new( patch_it: PreBinnedPatchIterator, @@ -39,7 +39,7 @@ where node_config: &NodeConfigCached, disk_stats_every: ByteSize, report_error: bool, - stream_kind: BK, + stream_kind: SK, ) -> Result { let patches: Vec<_> = patch_it.collect(); let mut sp = String::new(); @@ -51,11 +51,12 @@ where } info!("Using these pre-binned patches:\n{}", sp); } - let inp = futures_util::stream::iter(patches.into_iter()) + let pmax = patches.len(); + let inp = futures_util::stream::iter(patches.into_iter().enumerate()) .map({ let node_config = node_config.clone(); let stream_kind = stream_kind.clone(); - move |patch| { + move |(pix, patch)| { let query = PreBinnedQuery::new( patch, channel.clone(), @@ -66,10 +67,10 @@ where ); let ret: Pin + Send>> = match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) { - Ok(k) => Box::pin(k), + Ok(stream) => Box::pin(stream.map(move |q| (pix, q))), Err(e) => { error!("error from PreBinnedValueFetchedStream::new {:?}", e); - Box::pin(futures_util::stream::iter(vec![Err(e)])) + Box::pin(futures_util::stream::iter(vec![(pix, Err(e))])) } }; ret @@ -78,7 +79,7 @@ where .flatten() .filter_map({ let range = range.clone(); - move |k| { + move |(pix, k)| { let fit_range = range.full_range(); let g = match k { Ok(item) => match item { @@ -86,7 +87,11 @@ where StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { - Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) + if pix + 1 == pmax { + Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) + } else { + None + } } RangeCompletableItem::Data(item) => { match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 0b03984..695ce1e 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -12,23 +12,23 @@ use netpod::{NodeConfigCached, PerfOpts}; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct PreBinnedScalarValueFetchedStream +pub struct PreBinnedScalarValueFetchedStream where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { uri: http::Uri, resfut: Option, res: Option>, errored: bool, completed: bool, - _stream_kind: BK, + _stream_kind: SK, } -impl PreBinnedScalarValueFetchedStream +impl PreBinnedScalarValueFetchedStream where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { - pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &BK) -> Result { + pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &SK) -> Result { let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; let uri: hyper::Uri = format!( diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index a1f100e..5b8970a 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -52,23 +52,27 @@ fn get_binned_binary() { async fn get_binned_binary_inner() -> Result<(), Error> { let cluster = test_cluster(); let _hosts = spawn_test_hosts(cluster.clone()); - if false { + if true { get_binned_channel( "wave-f64-be-n21", "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:30.000Z", 2, &cluster, + true, + 2, ) .await?; } - if false { + if true { get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", - "1970-01-01T01:40:00.000Z", + "1970-01-01T01:35:00.000Z", 7, &cluster, + true, + 24, ) .await?; } @@ -79,6 +83,8 @@ async fn get_binned_binary_inner() -> Result<(), Error> { "1970-01-01T03:55:00.000Z", 2, &cluster, + true, + 3, ) .await?; } @@ -91,7 +97,9 @@ async fn get_binned_channel( end_date: S, bin_count: u32, cluster: &Cluster, -) -> Result<(), Error> + expect_range_complete: bool, + expect_bin_count: u64, +) -> Result where S: AsRef, { @@ -128,10 +136,13 @@ where let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); if !res.is_valid() { - error!("result is not valid:\n{:?}", res); - Err(Error::with_msg("result not valid")) + Err(Error::with_msg(format!("invalid response: {:?}", res))) + } else if res.range_complete_count == 0 && expect_range_complete { + Err(Error::with_msg(format!("expect range complete: {:?}", res))) + } else if res.bin_count != expect_bin_count { + Err(Error::with_msg(format!("bin count mismatch: {:?}", res))) } else { - Ok(()) + Ok(res) } } diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 79b2b02..bf3bbf3 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -9,4 +9,5 @@ tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "t tracing = "0.1.25" tracing-subscriber = "0.2.17" backtrace = "0.3.56" +lazy_static = "1.4.0" err = { path = "../err" } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 394b842..3429ae3 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -1,6 +1,7 @@ use err::Error; use std::future::Future; use std::panic; +use std::sync::Mutex; use tokio::task::JoinHandle; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -49,16 +50,24 @@ pub fn run>>(f: F) -> Result } } +lazy_static::lazy_static! { + pub static ref INITMX: Mutex = Mutex::new(0); +} + pub fn tracing_init() { - tracing_subscriber::fmt() - //.with_timer(tracing_subscriber::fmt::time::uptime()) - .with_target(true) - .with_thread_names(true) - //.with_max_level(tracing::Level::INFO) - .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info", - )) - .init(); + let mut g = INITMX.lock().unwrap(); + if *g == 0 { + tracing_subscriber::fmt() + //.with_timer(tracing_subscriber::fmt::time::uptime()) + .with_target(true) + .with_thread_names(true) + //.with_max_level(tracing::Level::INFO) + .with_env_filter(tracing_subscriber::EnvFilter::new( + "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info", + )) + .init(); + *g = 1; + } } pub fn spawn(task: T) -> JoinHandle