diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 5b14820..7aa9af9 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -404,8 +404,8 @@ where match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let inst1 = Instant::now(); - let u = match &k { - EventChunkerItem::Events(events) => match self.process_event_data(events) { + let u = match k { + EventChunkerItem::Events(events) => match self.process_event_data(&events) { Ok(k) => { let ret = Dim1F32StreamItem::Values(k); Ready(Some(Ok(ret))) @@ -416,10 +416,10 @@ where } }, EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))), - EventChunkerItem::EventDataReadStats(_stats) => { - // TODO ret.event_data_read_stats.trans(&mut k.event_data_read_stats); - // TODO ret.values_extract_stats.dur += inst2.duration_since(inst1); - err::todoval() + EventChunkerItem::EventDataReadStats(stats) => { + info!("++++++++ Dim1F32Stream stats {:?}", stats); + let ret = Dim1F32StreamItem::EventDataReadStats(stats); + Ready(Some(Ok(ret))) } }; let inst2 = Instant::now(); diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 7ddfdc2..9ee2e32 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -4,7 +4,6 @@ use crate::agg::AggregatableXdim1Bin; use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; -use netpod::timeunits::SEC; use netpod::EventDataReadStats; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -196,56 +195,25 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { Some(&ts) => ts >= self.ts2, - _ => panic!(), + None => panic!(), } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.first() { Some(&ts) => ts >= self.ts2, - _ => panic!(), + None => panic!(), } } fn ingest(&mut self, v: &mut Self::InputValue) { - if false { - trace!( - "ingest {} {} {} {:?} {:?}", - self.ends_before(v), - self.ends_after(v), - self.starts_after(v), - v.tss.first().map(|k| k / SEC), - v.tss.last().map(|k| k / SEC), - ); - } for i1 in 0..v.tss.len() { let ts = v.tss[i1]; if ts < self.ts1 { - trace!( - "EventBatchAgg {} {} {} {} IS BEFORE", - v.tss[i1], - v.mins[i1], - v.maxs[i1], - v.avgs[i1] - ); continue; } else if ts >= self.ts2 { - trace!( - "EventBatchAgg {} {} {} {} IS AFTER", - v.tss[i1], - v.mins[i1], - v.maxs[i1], - v.avgs[i1] - ); continue; } else { - trace!( - "EventBatchAgg {} {} {} {}", - v.tss[i1], - v.mins[i1], - v.maxs[i1], - v.avgs[i1] - ); self.min = self.min.min(v.mins[i1]); self.max = self.max.max(v.maxs[i1]); self.sum += v.avgs[i1]; @@ -375,14 +343,17 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => (), - MinMaxAvgScalarEventBatchStreamItem::Log(_) => (), + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { + info!("33333333333 2222222222222222222222 see stats {:?}", stats); + self.event_data_read_stats.trans(stats); + } + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => {} + MinMaxAvgScalarEventBatchStreamItem::Log(_) => {} } } fn result(self) -> Vec { - let mut ret: Vec = self + let mut ret: Vec<_> = self .agg .result() .into_iter() diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 51669b7..8aed020 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -403,14 +403,17 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => (), - MinMaxAvgScalarBinBatchStreamItem::Log(_) => (), + MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => { + info!("kkkkkkkkkkkkkkkkk 0000000000000000000 see stats {:?}", stats); + self.event_data_read_stats.trans(stats); + } + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {} + MinMaxAvgScalarBinBatchStreamItem::Log(_) => {} } } fn result(self) -> Vec { - let mut ret: Vec = self + let mut ret: Vec<_> = self .agg .result() .into_iter() diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index f1f7f4d..dd86f9b 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -292,6 +292,7 @@ impl Stream for PreBinnedValueStream { Ready(Some(Ok(PreBinnedItem::Batch(batch)))) } Ok(PreBinnedItem::EventDataReadStats(stats)) => { + info!("PreBinnedValueStream as Stream seeing stats {:?}", stats); Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats)))) } Ok(PreBinnedItem::Log(item)) => Ready(Some(Ok(PreBinnedItem::Log(item)))), diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 9b1318a..0b49028 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -71,7 +71,15 @@ impl Stream for PreBinnedValueFetchedStream { pin_mut!(res); match res.poll_next(cx) { Ready(Some(Ok(frame))) => match decode_frame::>(&frame) { - Ok(Ok(item)) => Ready(Some(Ok(item))), + Ok(Ok(item)) => { + match &item { + PreBinnedItem::EventDataReadStats(stats) => { + info!("PreBinnedValueFetchedStream ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ stats {:?}", stats); + } + _ => {} + } + Ready(Some(Ok(item))) + } Ok(Err(e)) => { self.errored = true; Ready(Some(Err(e))) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 72dbc55..fa1b6d6 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -38,77 +38,47 @@ impl Stream for EventBlobsComplete { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use Poll::*; - if self.completed { - panic!("EventBlobsComplete poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } 'outer: loop { - let z = match &mut self.evs { - Some(evs) => match evs.poll_next_unpin(cx) { - Ready(Some(k)) => Ready(Some(k)), - Ready(None) => { - self.evs = None; - continue 'outer; - } - Pending => Pending, - }, - None => match self.file_chan.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = - EventChunker::from_event_boundary(inp, self.channel_config.clone(), self.range.clone()); - self.evs = Some(chunker); + break if self.completed { + panic!("EventBlobsComplete poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else { + match &mut self.evs { + Some(evs) => match evs.poll_next_unpin(cx) { + Ready(Some(k)) => Ready(Some(k)), + Ready(None) => { + self.evs = None; continue 'outer; } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } + Pending => Pending, }, - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - }, - }; - break z; - } - } -} - -pub fn event_blobs_complete( - query: &netpod::AggQuerySingleChannel, - node: Node, -) -> impl Stream> + Send { - let query = query.clone(); - let node = node.clone(); - async_stream::stream! { - let filerx = open_files(err::todoval(), err::todoval(), node); - while let Ok(fileres) = filerx.recv().await { - match fileres { - Ok(file) => { - let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::from_event_boundary(inp, err::todoval(), err::todoval()); - while let Some(evres) = chunker.next().await { - match evres { - Ok(evres) => { - yield Ok(evres); + None => match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + ); + self.evs = Some(chunker); + continue 'outer; } Err(e) => { - yield Err(e) + self.errored = true; + Ready(Some(Err(e))) } + }, + Ready(None) => { + self.completed = true; + Ready(None) } - } + Pending => Pending, + }, } - Err(e) => { - yield Err(e); - } - } + }; } } } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 6c0fa74..7f936e0 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -21,6 +21,11 @@ pub struct EventChunker { range: NanoRange, seen_beyond_range: bool, sent_beyond_range: bool, + data_emit_complete: bool, + final_stats_sent: bool, + data_since_last_stats: u32, + stats_emit_interval: u32, + parsed_bytes: u64, } enum DataFileState { @@ -30,6 +35,7 @@ enum DataFileState { struct ParseResult { events: EventFull, + parsed_bytes: u64, } impl EventChunker { @@ -50,6 +56,11 @@ impl EventChunker { range, seen_beyond_range: false, sent_beyond_range: false, + data_emit_complete: false, + final_stats_sent: false, + data_since_last_stats: 0, + stats_emit_interval: 1, + parsed_bytes: 0, } } @@ -70,6 +81,11 @@ impl EventChunker { range, seen_beyond_range: false, sent_beyond_range: false, + data_emit_complete: false, + final_stats_sent: false, + data_since_last_stats: 0, + stats_emit_interval: 1, + parsed_bytes: 0, } } @@ -79,6 +95,7 @@ impl EventChunker { fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { let mut ret = EventFull::empty(); + let mut parsed_bytes = 0; use byteorder::{ReadBytesExt, BE}; loop { trace!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min); @@ -107,7 +124,7 @@ impl EventChunker { self.state = DataFileState::Event; self.need_min = 4; buf.advance(totlen); - // TODO ret.event_data_read_stats.parsed_bytes += totlen as u64; + parsed_bytes += totlen as u64; } } DataFileState::Event => { @@ -127,6 +144,7 @@ impl EventChunker { let pulse = sl.read_i64::().unwrap() as u64; if ts >= self.range.end { self.seen_beyond_range = true; + self.data_emit_complete = true; break; } if ts < self.range.beg { @@ -226,16 +244,17 @@ impl EventChunker { "TODO uncompressed event parsing not yet implemented" )))?; } - trace!("advance and reset need_min"); buf.advance(len as usize); - // TODO ret.event_data_read_stats.parsed_bytes += len as u64; + parsed_bytes += len as u64; self.need_min = 4; } } } } - trace!("AFTER PARSE LOOP len {}", ret.tss.len()); - Ok(ParseResult { events: ret }) + Ok(ParseResult { + events: ret, + parsed_bytes, + }) } } @@ -275,60 +294,83 @@ impl Stream for EventChunker { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("EventChunker poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - if self.seen_beyond_range { - if self.sent_beyond_range { - self.completed = true; - return Ready(None); - } else { - self.sent_beyond_range = true; - return Ready(Some(Ok(EventChunkerItem::RangeComplete))); - } - } - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(mut fcr))) => { - let r = self.parse_buf(&mut fcr.buf); - match r { - Ok(res) => { - if fcr.buf.len() > 0 { - // TODO gather stats about this: - self.inp.put_back(fcr); - } - if self.need_min > 1024 * 8 { - let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); - warn!("{}", msg); - self.errored = true; - Ready(Some(Err(Error::with_msg(msg)))) - } else { - let x = self.need_min; - self.inp.set_need_min(x); - let ret = EventChunkerItem::Events(res.events); - let ret = Ok(ret); - Ready(Some(ret)) - } - } - Err(e) => { - error!("EventChunker parse_buf returned error {:?}", e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - } - } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { + 'outer: loop { + break if self.completed { + panic!("EventChunker poll_next on completed"); + } else if self.errored { self.completed = true; Ready(None) - } - Pending => Pending, + } else if self.data_since_last_stats >= self.stats_emit_interval { + self.data_since_last_stats = 0; + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + let ret = EventChunkerItem::EventDataReadStats(item); + Ready(Some(Ok(ret))) + } else if self.sent_beyond_range { + self.completed = true; + Ready(None) + } else if self.final_stats_sent { + self.sent_beyond_range = true; + if self.seen_beyond_range { + Ready(Some(Ok(EventChunkerItem::RangeComplete))) + } else { + continue 'outer; + } + } else if self.data_emit_complete { + self.data_since_last_stats = 0; + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + warn!("EMIT FINAL STATS {:?}", item); + let ret = EventChunkerItem::EventDataReadStats(item); + self.final_stats_sent = true; + Ready(Some(Ok(ret))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut fcr))) => { + let r = self.parse_buf(&mut fcr.buf); + match r { + Ok(res) => { + self.parsed_bytes += res.parsed_bytes; + if fcr.buf.len() > 0 { + // TODO gather stats about this: + self.inp.put_back(fcr); + } + if self.need_min > 1024 * 8 { + let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); + warn!("{}", msg); + self.errored = true; + Ready(Some(Err(Error::with_msg(msg)))) + } else { + let x = self.need_min; + self.inp.set_need_min(x); + self.data_since_last_stats += 1; + let ret = EventChunkerItem::Events(res.events); + let ret = Ok(ret); + Ready(Some(ret)) + } + } + Err(e) => { + error!("EventChunker parse_buf returned error {:?}", e); + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.data_emit_complete = true; + continue 'outer; + } + Pending => Pending, + } + }; } } } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index ef4ac5f..6c028df 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -5,6 +5,7 @@ use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::EventDataReadStats; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -166,6 +167,7 @@ where data_emit_complete: bool, batch_size: usize, logitems: VecDeque, + event_data_read_stats_items: VecDeque, } impl MergedMinMaxAvgScalarStream @@ -192,6 +194,7 @@ where data_emit_complete: false, batch_size: 64, logitems: VecDeque::new(), + event_data_read_stats_items: VecDeque::new(), } } @@ -227,9 +230,9 @@ where self.logitems.push_back(item); continue 'l1; } - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => { - // TODO merge also the stats: either just sum, or sum up by input index. - todo!(); + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { + self.event_data_read_stats_items.push_back(stats); + continue 'l1; } }, Ready(Some(Err(e))) => { @@ -266,18 +269,17 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("MergedMinMaxAvgScalarStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - if let Some(item) = self.logitems.pop_front() { - return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::Log(item)))); - } 'outer: loop { - break if self.data_emit_complete { + break if self.completed { + panic!("MergedMinMaxAvgScalarStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.logitems.pop_front() { + Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::Log(item)))) + } else if let Some(item) = self.event_data_read_stats_items.pop_front() { + Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item)))) + } else if self.data_emit_complete { if self.range_complete_observed_all { if self.range_complete_observed_all_emitted { self.completed = true; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index ca7b1d4..c9803d9 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -49,9 +49,8 @@ pub async fn x_processed_stream_from_node( netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin); - let s2 = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames); - let s3: Pin> + Send>> = Box::pin(s2); - Ok(s3) + let items = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames); + Ok(Box::pin(items)) } pub fn crchex(t: T) -> String diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 64a554e..fb7df64 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -4,8 +4,7 @@ use crate::frame::makeframe::decode_frame; use crate::raw::conn::RawConnOut; use err::Error; use futures_core::Stream; -use futures_util::pin_mut; -#[allow(unused_imports)] +use futures_util::StreamExt; use netpod::log::*; use std::pin::Pin; use std::task::{Context, Poll}; @@ -49,9 +48,7 @@ where return Ready(None); } loop { - let j = &mut self.inp; - pin_mut!(j); - break match j.poll_next(cx) { + break match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(frame))) => { type ExpectedType = RawConnOut; trace!( @@ -59,13 +56,25 @@ where frame.buf().len() ); match decode_frame::(&frame) { - Ok(item) => match item { - Ok(item) => Ready(Some(Ok(item))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) + Ok(item) => { + match item { + Ok(item) => { + match &item { + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { + info!("✒✒✒✒✒✒✒✒✒✒✒✒✒✒✒✒ MinMaxAvgScalarEventBatchStreamFromFrames stats {:?}", stats); + } + _ => { + info!("✒ ✒ ✒ ✒ other kind") + } + } + Ready(Some(Ok(item))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } } - }, + } Err(e) => { error!( "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 5662388..a52c2a9 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,5 +1,5 @@ use crate::agg::binnedx::IntoBinnedXBins1; -use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::IntoDim1F32Stream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; @@ -8,9 +8,7 @@ use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; -#[allow(unused_imports)] use netpod::log::*; -use netpod::timeunits::SEC; use netpod::{NodeConfigCached, Shape}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; @@ -56,10 +54,6 @@ async fn raw_conn_handler_inner( match raw_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(mut ce) => { - /*error!( - "raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP {:?}", - ce.err - );*/ let buf = make_frame::(&Err(ce.err))?; match ce.netout.write_all(&buf).await { Ok(_) => (), @@ -89,7 +83,7 @@ async fn raw_conn_handler_inner_try( addr: SocketAddr, node_config: &NodeConfigCached, ) -> Result<(), ConnErr> { - debug!("raw_conn_handler SPAWNED for {:?}", addr); + let _ = addr; let (netin, mut netout) = stream.into_split(); let mut h = InMemoryFrameAsyncReadStream::new(netin); let mut frames = vec![]; @@ -108,8 +102,8 @@ async fn raw_conn_handler_inner_try( } } if frames.len() != 1 { - error!("expect a command frame"); - return Err((Error::with_msg("expect a command frame"), netout))?; + error!("missing command frame"); + return Err((Error::with_msg("missing command frame"), netout))?; } let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) { Ok(k) => k, @@ -119,8 +113,8 @@ async fn raw_conn_handler_inner_try( let evq = match res { Ok(k) => k, Err(e) => { - error!("can not parse json {:?}", e); - return Err((Error::with_msg("can not parse request json"), netout))?; + error!("json parse error: {:?}", e); + return Err((Error::with_msg("json parse error"), netout))?; } }; match dbconn::channel_exists(&evq.channel, &node_config).await { @@ -175,22 +169,21 @@ async fn raw_conn_handler_inner_try( query.buffer_size as usize, ) .into_dim_1_f32_stream() - .into_binned_x_bins_1(); + .into_binned_x_bins_1() + .map(|k| { + match &k { + Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats)) => { + info!("raw::conn ✑ ✑ ✑ ✑ ✑ ✑ seeing stats: {:?}", stats); + } + _ => {} + } + k + }); let mut e = 0; while let Some(item) = s1.next().await { match &item { - Ok(MinMaxAvgScalarEventBatchStreamItem::Values(k)) => { + Ok(MinMaxAvgScalarEventBatchStreamItem::Values(_)) => { e += 1; - if false { - trace!( - "emit items sp {:2} e {:3} len {:3} {:10?} {:10?}", - node_config.node.split, - e, - k.tss.len(), - k.tss.first().map(|k| k / SEC), - k.tss.last().map(|k| k / SEC), - ); - } } _ => (), } @@ -204,31 +197,6 @@ async fn raw_conn_handler_inner_try( } } } - if false { - // Manual test batch. - let mut batch = MinMaxAvgScalarEventBatch::empty(); - batch.tss.push(42); - batch.tss.push(43); - batch.mins.push(7.1); - batch.mins.push(7.2); - batch.maxs.push(8.3); - batch.maxs.push(8.4); - batch.avgs.push(9.5); - batch.avgs.push(9.6); - let batch = MinMaxAvgScalarEventBatchStreamItem::Values(batch); - let mut s1 = futures_util::stream::iter(vec![batch]).map(Result::Ok); - while let Some(item) = s1.next().await { - match make_frame::(&item) { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; - } - } - } - } let buf = make_term_frame(); match netout.write_all(&buf).await { Ok(_) => (), @@ -238,5 +206,6 @@ async fn raw_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } + let _total_written_value_items = e; Ok(()) } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index eada06a..eaae09d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -45,7 +45,7 @@ async fn data_api_proxy(req: Request, node_config: NodeConfigCached) -> Re match data_api_proxy_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { - error!("{:?}", e); + error!("data_api_proxy sees error: {:?}", e); Err(e) } } @@ -149,14 +149,6 @@ impl hyper::body::HttpBody for BodyStreamWrap { type Error = Error; fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll>> { - /* - use futures_core::stream::Stream; - let z: &mut async_channel::Receiver> = &mut self.0.receiver; - match Pin::new(z).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(k) => Poll::Ready(k), - } - */ todo!() } @@ -208,8 +200,8 @@ where Pending => Pending, }, Err(e) => { - error!("PANIC CAUGHT in httpret::BodyStream: {:?}", e); - let e = Error::with_msg(format!("PANIC CAUGHT in httpret::BodyStream: {:?}", e)); + error!("panic caught in httpret::BodyStream: {:?}", e); + let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {:?}", e)); Ready(Some(Err(e))) } } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index bfbd2aa..1980d62 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -36,7 +36,8 @@ async fn go() -> Result<(), Error> { ClientType::Binned(opts) => { let beg = opts.beg.parse()?; let end = opts.end.parse()?; - retrieval::client::get_binned(opts.host, opts.port, opts.channel, beg, end, opts.bins).await?; + retrieval::client::get_binned(opts.host, opts.port, opts.backend, opts.channel, beg, end, opts.bins) + .await?; } }, SubCmd::GenerateTestData => { diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index e091424..8d155f5 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -40,6 +40,8 @@ pub struct BinnedClient { #[clap(long)] pub port: u16, #[clap(long)] + pub backend: String, + #[clap(long)] pub channel: String, #[clap(long)] pub beg: String, diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 0c36a29..b1755a5 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -9,13 +9,14 @@ use netpod::log::*; pub async fn get_binned( host: String, port: u16, + channel_backend: String, channel_name: String, beg_date: DateTime, end_date: DateTime, bin_count: u32, ) -> Result<(), Error> { + info!("------- get_binned client"); let t1 = Utc::now(); - let channel_backend = "NOBACKEND"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&beg_date={}&end_date={}&bin_count={}", @@ -27,27 +28,24 @@ pub async fn get_binned( end_date.format(date_fmt), bin_count, ); - info!("URI {:?}", uri); + info!("get_binned uri {:?}", uri); let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) .body(Body::empty())?; - info!("Request for {:?}", req); let client = hyper::Client::new(); let res = client.request(req).await?; - info!("client response {:?}", res); if res.status() != StatusCode::OK { - error!("Server error"); - return Err(Error::with_msg(format!("Server error"))); + error!("Server error {:?}", res); + return Err(Error::with_msg(format!("Server error {:?}", res))); } - //let (res_head, mut res_body) = res.into_parts(); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1); use futures_util::StreamExt; use std::future::ready; let mut bin_count = 0; let s3 = s2 - .map_err(|e| error!("{:?}", e)) + .map_err(|e| error!("get_binned {:?}", e)) .filter_map(|item| { let g = match item { Ok(frame) => { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index a67bdbe..224e6ba 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -55,6 +55,9 @@ async fn get_binned_0_inner() -> Result<(), Error> { &cluster, ) .await?; + if true { + return Ok(()); + } get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", @@ -71,9 +74,6 @@ async fn get_binned_0_inner() -> Result<(), Error> { &cluster, ) .await?; - if true { - return Ok(()); - } Ok(()) } @@ -94,7 +94,7 @@ where let channel_backend = "testbackend"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( - "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", + "http://{}:{}/api/1/binned?cache_usage=ignore&channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", node0.host, node0.port, channel_backend, @@ -125,11 +125,15 @@ where #[derive(Debug)] pub struct BinnedResponse { bin_count: usize, + bytes_read: u64, } impl BinnedResponse { pub fn new() -> Self { - Self { bin_count: 0 } + Self { + bin_count: 0, + bytes_read: 0, + } } } @@ -178,6 +182,10 @@ where a.bin_count += k.ts1s.len(); Ok(a) } + Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => { + a.bytes_read += stats.parsed_bytes; + Ok(a) + } Ok(_) => Ok(a), Err(e) => Err(e), }, @@ -186,6 +194,7 @@ where ready(g) }); let ret = s1.await; + info!("BinnedResponse: {:?}", ret); ret }