diff --git a/disk/src/agg/binnedt3.rs b/disk/src/agg/binnedt3.rs index d1690d9..278dfd2 100644 --- a/disk/src/agg/binnedt3.rs +++ b/disk/src/agg/binnedt3.rs @@ -37,7 +37,7 @@ impl Agg3 { } } - fn ingest(&mut self, item: &mut MinMaxAvgScalarEventBatch) { + fn ingest(&mut self, item: &MinMaxAvgScalarEventBatch) { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; if ts < self.range.beg { @@ -185,11 +185,7 @@ impl BinnedT3Stream { // TODO cycle_current_bin enqueues the bin, can I return here instead? None } else { - let mut item = item; - err::todo(); - // TODO ingest the data into Agg3 - let item = item; - //if ag.ends_after(&item) { + ag.ingest(&item); if item.ends_after(ag.range.clone()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index e4ad482..ef348d5 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,8 +1,8 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; +use crate::agg::streams::{Appendable, StreamItem}; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; +use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, WithTimestamps}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -279,3 +279,16 @@ impl RangeOverlapInfo for MinMaxAvgScalarEventBatch { } } } + +impl Appendable for MinMaxAvgScalarEventBatch { + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 37a98d7..5315094 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::streams::{Bins, StreamItem}; +use crate::agg::streams::{Appendable, Bins, StreamItem}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; @@ -316,3 +316,18 @@ impl MakeBytesFrame for Result Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.ts1s.extend_from_slice(&src.ts1s); + self.ts2s.extend_from_slice(&src.ts2s); + self.counts.extend_from_slice(&src.counts); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 81d904b..8ca70da 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -22,6 +22,8 @@ pub trait Bins { fn bin_count(&self) -> u32; } +// TODO this is meant for bins, count them, collect range-complete, and deliver these +// information also to the client. pub trait Collected { fn new(bin_count_exp: u32) -> Self; fn timed_out(&mut self, k: bool); @@ -36,3 +38,8 @@ pub trait ToJsonResult { type Output; fn to_json_result(&self) -> Result; } + +pub trait Appendable { + fn empty() -> Self; + fn append(&mut self, src: &Self); +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 468a057..61ae5c7 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -3,7 +3,7 @@ use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; -use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; +use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; @@ -74,6 +74,20 @@ impl Collected for MinMaxAvgScalarBinBatchCollected { } } +impl Collectable for MinMaxAvgScalarBinBatch { + type Collected = MinMaxAvgScalarBinBatchCollected; + + fn append_to(&self, collected: &mut Self::Collected) { + let batch = &mut collected.batch; + batch.ts1s.extend_from_slice(&self.ts1s); + batch.ts2s.extend_from_slice(&self.ts2s); + batch.counts.extend_from_slice(&self.counts); + batch.mins.extend_from_slice(&self.mins); + batch.maxs.extend_from_slice(&self.maxs); + batch.avgs.extend_from_slice(&self.avgs); + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct MinMaxAvgScalarBinBatchCollectedJsonResult { ts_bin_edges: Vec, @@ -437,49 +451,6 @@ impl PushableIndex for MinMaxAvgScalarEventBatch { } } -impl Collected for MinMaxAvgScalarEventBatch { - // TODO for this case we don't have an expected number of events. Factor out into another trait? - fn new(bin_count_exp: u32) -> Self { - // TODO factor out the concept of RangeComplete into another trait layer: - MinMaxAvgScalarEventBatch::empty() - } - - fn timed_out(&mut self, k: bool) {} -} - -impl Collectable for MinMaxAvgScalarEventBatch { - type Collected = MinMaxAvgScalarEventBatch; - - fn append_to(&self, collected: &mut Self::Collected) { - // TODO create separate traits for different concerns: - // Some occasion I want to just append. - // In other case, I need to collect also timeout flag, missing bin count and such. - collected.tss.extend_from_slice(&self.tss); - collected.mins.extend_from_slice(&self.mins); - collected.maxs.extend_from_slice(&self.maxs); - collected.avgs.extend_from_slice(&self.avgs); - } -} - -impl Collected for MinMaxAvgScalarBinBatch { - fn new(bin_count_exp: u32) -> Self { - MinMaxAvgScalarBinBatch::empty() - } - fn timed_out(&mut self, k: bool) {} -} - -impl Collectable for MinMaxAvgScalarBinBatch { - type Collected = MinMaxAvgScalarBinBatch; - fn append_to(&self, collected: &mut Self::Collected) { - collected.ts1s.extend_from_slice(&self.ts1s); - collected.ts2s.extend_from_slice(&self.ts2s); - collected.counts.extend_from_slice(&self.counts); - collected.mins.extend_from_slice(&self.mins); - collected.maxs.extend_from_slice(&self.maxs); - collected.avgs.extend_from_slice(&self.avgs); - } -} - pub trait RangeOverlapInfo { fn ends_before(&self, range: NanoRange) -> bool; fn ends_after(&self, range: NanoRange) -> bool; @@ -492,12 +463,11 @@ pub trait XBinnedEvents: + Send + Serialize + DeserializeOwned - + Collectable - + Collected + AggregatableTdim + WithLen + WithTimestamps + PushableIndex + + Appendable where SK: BinnedStreamKind, { @@ -511,11 +481,11 @@ pub trait TBinnedBins: + Serialize + DeserializeOwned + Collectable - + Collected + ReadableFromFile + FilterFittingInside + AggregatableTdim2 + WithLen + + Appendable { fn frame_type() -> u32; } @@ -565,9 +535,8 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static ) -> Result; fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream -/*where - S: Stream::XBinnedEvents>>, Error>> - + Unpin*/; + where + S: Stream>, Error>> + Send + 'static; } #[derive(Clone)] @@ -635,8 +604,11 @@ impl BinnedStreamKind for BinnedStreamKindScalar { Ok(BoxedStream::new(Box::pin(s))?) } - fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream { - err::todoval() + fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream + where + S: Stream>, Error>> + Send + 'static, + { + Self::XBinnedToTBinnedStream::new(inp, spec) } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 7f3aeab..8c819d0 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,5 +1,5 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::{Collectable, Collected, StreamItem}; +use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem}; use crate::binned::RangeCompletableItem::RangeComplete; use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; @@ -131,8 +131,7 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - // TODO refactor usage of parameter - values: <::TBinnedBins as Collected>::new(0), + values: <::TBinnedBins as Appendable>::empty(), write_fut: None, read_cache_fut: None, stream_kind, @@ -310,6 +309,7 @@ where } else { match self.query.cache_usage { super::CacheUsage::Use | super::CacheUsage::Recreate => { + err::todo(); let msg = format!( "write cache file query: {:?} bin count: {}", self.query.patch, @@ -320,8 +320,7 @@ where self.streamlog.append(Level::INFO, msg); let values = std::mem::replace( &mut self.values, - // Do not use expectation on the number of bins here: - <::TBinnedBins as Collected>::new(0), + <::TBinnedBins as Appendable>::empty(), ); let fut = super::write_pb_cache_min_max_avg_scalar( values, @@ -351,10 +350,7 @@ where continue 'outer; } RangeCompletableItem::Data(item) => { - // TODO need trait Appendable which simply appends to the same type, so that I can - // write later the whole batch of numbers in one go. - err::todo(); - //item.append_to(&mut self.values); + self.values.append(&item); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) } }, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 417a8a8..b4f90e7 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,6 +1,6 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem}; +use crate::agg::streams::{Appendable, Collectable, Collected, StatsItem, StreamItem}; use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; use crate::streamlog::LogItem; use err::Error; @@ -47,7 +47,7 @@ where ixs: vec![0; n], errored: false, completed: false, - batch: <::XBinnedEvents as Collected>::new(0), + batch: <::XBinnedEvents as Appendable>::empty(), ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -183,7 +183,7 @@ where if self.batch.len() != 0 { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - let emp = <::XBinnedEvents as Collected>::new(0); + let emp = <::XBinnedEvents as Appendable>::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) @@ -193,7 +193,7 @@ where } } else { assert!(lowest_ts >= self.ts_last_emit); - let emp = <::XBinnedEvents as Collected>::new(0); + let emp = <::XBinnedEvents as Appendable>::empty(); let mut local_batch = std::mem::replace(&mut self.batch, emp); self.ts_last_emit = lowest_ts; let rix = self.ixs[lowest_ix]; @@ -227,7 +227,7 @@ where if self.batch.len() >= self.batch_size { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - let emp = <::XBinnedEvents as Collected>::new(0); + let emp = <::XBinnedEvents as Appendable>::empty(); let ret = std::mem::replace(&mut self.batch, emp); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 5129dda..261e2d8 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -61,7 +61,10 @@ async fn events_conn_handler_inner( make_frame::>, Error>>(&Err(ce.err))?; match ce.netout.write_all(&buf).await { Ok(_) => (), - Err(e) => return Err(e)?, + Err(e) => { + error!("events_conn_handler_inner sees: {:?}", e); + return Err(e)?; + } } } } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 6465116..b305e41 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -44,11 +44,11 @@ fn test_cluster() -> Cluster { } #[test] -fn get_binned() { - taskrun::run(get_binned_0_inner()).unwrap(); +fn get_binned_binary() { + taskrun::run(get_binned_binary_inner()).unwrap(); } -async fn get_binned_0_inner() -> Result<(), Error> { +async fn get_binned_binary_inner() -> Result<(), Error> { let cluster = test_cluster(); let _hosts = spawn_test_hosts(cluster.clone()); get_binned_channel( @@ -115,6 +115,7 @@ where let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) + .header("accept", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?;