diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs index a383c8b..f1621df 100644 --- a/dbconn/src/bincache.rs +++ b/dbconn/src/bincache.rs @@ -151,11 +151,11 @@ pub fn write_cached_scylla<'a>( let offset = coord.ix(); warn!( "write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}", + data.counts().len(), series, bin_len_sec, patch_len_sec, offset, - data.counts().len() ); let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?; scy.execute( @@ -194,7 +194,23 @@ pub async fn fetch_uncached_data( // Try to find a higher resolution pre-binned grid which covers the requested patch. let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) { Ok(Some(range)) => { - fetch_uncached_higher_res_prebinned(series, &chn, range, agg_kind, cache_usage.clone(), scy.clone()).await + if coord.patch_range() != range.range() { + error!( + "The chosen covering range does not exactly cover the requested patch {:?} vs {:?}", + coord.patch_range(), + range.range() + ); + } + fetch_uncached_higher_res_prebinned( + series, + &chn, + coord.clone(), + range, + agg_kind, + cache_usage.clone(), + scy.clone(), + ) + .await } Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await, Err(e) => Err(e), @@ -243,52 +259,75 @@ pub fn fetch_uncached_data_box( pub async fn fetch_uncached_higher_res_prebinned( series: u64, chn: &ChannelTyped, + coord: PreBinnedPatchCoord, range: PreBinnedPatchRange, agg_kind: AggKind, cache_usage: CacheUsage, scy: Arc, ) -> Result<(Box, bool), Error> { + let edges = coord.edges(); // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. let do_time_weight = true; // We must produce some result with correct types even if upstream delivers nothing at all. let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind); - let mut time_binner = bin0.time_binner_new(range.edges(), do_time_weight); + let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); let mut complete = true; let patch_it = PreBinnedPatchIterator::from_range(range.clone()); - for patch in patch_it { + for patch_coord in patch_it { // We request data here for a Coord, meaning that we expect to receive multiple bins. // The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord. - let coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); - let (bin, comp) = - pre_binned_value_stream_with_scy(series, chn, &coord, agg_kind.clone(), cache_usage.clone(), scy.clone()) - .await?; - complete = complete & comp; + //let patch_coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); + let (bin, comp) = pre_binned_value_stream_with_scy( + series, + chn, + &patch_coord, + agg_kind.clone(), + cache_usage.clone(), + scy.clone(), + ) + .await?; + if let Err(msg) = bin.validate() { + error!( + "pre-binned intermediate issue {} coord {:?} patch_coord {:?}", + msg, coord, patch_coord + ); + } + complete = complete && comp; time_binner.ingest(bin.as_time_binnable_dyn()); } // Fixed limit to defend against a malformed implementation: let mut i = 0; - while i < 80000 && time_binner.bins_ready_count() < range.bin_count() as usize { + while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { + let n1 = time_binner.bins_ready_count(); if false { trace!( - "extra cycle {} {} {}", + "pre-binned extra cycle {} {} {}", i, time_binner.bins_ready_count(), - range.bin_count() + coord.bin_count() ); } time_binner.cycle(); i += 1; + if time_binner.bins_ready_count() <= n1 { + warn!("pre-binned cycle did not add another bin, break"); + break; + } } - if time_binner.bins_ready_count() < range.bin_count() as usize { + if time_binner.bins_ready_count() < coord.bin_count() as usize { return Err(Error::with_msg_no_trace(format!( - "unable to produce all bins for the patch range {} vs {}", + "pre-binned unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", time_binner.bins_ready_count(), - range.bin_count(), + coord.bin_count(), + edges.len(), ))); } let ready = time_binner .bins_ready() .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?; + if let Err(msg) = ready.validate() { + error!("pre-binned final issue {} coord {:?}", msg, coord); + } Ok((ready, complete)) } @@ -299,16 +338,17 @@ pub async fn fetch_uncached_binned_events( agg_kind: AggKind, scy: Arc, ) -> Result<(Box, bool), Error> { + let edges = coord.edges(); // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. let do_time_weight = true; // We must produce some result with correct types even if upstream delivers nothing at all. let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); - let mut time_binner = bin0.time_binner_new(coord.edges(), do_time_weight); + let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); let deadline = Instant::now(); let deadline = deadline .checked_add(Duration::from_millis(6000)) .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; - let evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), AggKind::Plain); + let evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind); let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); let mut complete = false; loop { @@ -343,9 +383,10 @@ pub async fn fetch_uncached_binned_events( // Fixed limit to defend against a malformed implementation: let mut i = 0; while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { + let n1 = time_binner.bins_ready_count(); if false { trace!( - "extra cycle {} {} {}", + "events extra cycle {} {} {}", i, time_binner.bins_ready_count(), coord.bin_count() @@ -353,17 +394,25 @@ pub async fn fetch_uncached_binned_events( } time_binner.cycle(); i += 1; + if time_binner.bins_ready_count() <= n1 { + warn!("events cycle did not add another bin, break"); + break; + } } if time_binner.bins_ready_count() < coord.bin_count() as usize { return Err(Error::with_msg_no_trace(format!( - "unable to produce all bins for the patch range {} vs {}", + "events unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", time_binner.bins_ready_count(), coord.bin_count(), + edges.len(), ))); } let ready = time_binner .bins_ready() .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?; + if let Err(msg) = ready.validate() { + error!("time binned invalid {} coord {:?}", msg, coord); + } Ok((ready, complete)) } diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 8f28b47..048185f 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -5,12 +5,12 @@ pub mod search; pub mod pg { pub use tokio_postgres::{Client, Error}; } - use err::Error; use netpod::log::*; -use netpod::{Channel, Database, NodeConfigCached}; +use netpod::{Channel, Database, NodeConfigCached, ScyllaConfig}; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::Session as ScySession; use std::time::Duration; use tokio_postgres::{Client, NoTls}; @@ -92,6 +92,16 @@ pub async fn create_connection(db_config: &Database) -> Result { Ok(cl) } +pub async fn create_scylla_connection(scyconf: &ScyllaConfig) -> Result { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) + .build() + .await + .err_conv()?; + Ok(scy) +} + pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result { let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index 5b7a723..6092f5f 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -14,9 +14,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio_postgres::Client as PgClient; -macro_rules! impl_read_values_fut { +macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); + let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); let fut = fut.map(|x| { match x { Ok(k) => { @@ -37,7 +37,8 @@ struct ReadValues { scalar_type: ScalarType, shape: Shape, range: NanoRange, - ts_msp: VecDeque, + ts_msps: VecDeque, + fwd: bool, fut: Pin, Error>> + Send>>, scy: Arc, } @@ -48,23 +49,29 @@ impl ReadValues { scalar_type: ScalarType, shape: Shape, range: NanoRange, - ts_msp: VecDeque, + ts_msps: VecDeque, + fwd: bool, scy: Arc, ) -> Self { - Self { + let mut ret = Self { series, scalar_type, shape, range, - ts_msp, - fut: Box::pin(futures_util::future::lazy(|_| panic!())), + ts_msps, + fwd, + fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( + "future not initialized", + )))), scy, - } + }; + ret.next(); + ret } fn next(&mut self) -> bool { - if let Some(ts_msp) = self.ts_msp.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1); + if let Some(ts_msp) = self.ts_msps.pop_front() { + self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 1); true } else { false @@ -76,23 +83,28 @@ impl ReadValues { ts_msp: u64, _has_more_msp: bool, ) -> Pin, Error>> + Send>> { - // TODO this also needs to differentiate on Shape. let fut = match &self.shape { Shape::Scalar => match &self.scalar_type { + ScalarType::I8 => { + read_values!(read_next_values_scalar_i8, self, ts_msp) + } + ScalarType::I16 => { + read_values!(read_next_values_scalar_i16, self, ts_msp) + } ScalarType::I32 => { - impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp) + read_values!(read_next_values_scalar_i32, self, ts_msp) } ScalarType::F32 => { - impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) + read_values!(read_next_values_scalar_f32, self, ts_msp) } ScalarType::F64 => { - impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) + read_values!(read_next_values_scalar_f64, self, ts_msp) } _ => err::todoval(), }, Shape::Wave(_) => match &self.scalar_type { ScalarType::U16 => { - impl_read_values_fut!(read_next_values_array_u16, self, ts_msp) + read_values!(read_next_values_array_u16, self, ts_msp) } _ => err::todoval(), }, @@ -104,7 +116,9 @@ impl ReadValues { enum FrState { New, - FindMsp(Pin, Error>> + Send>>), + FindMsp(Pin, Error>> + Send>>), + ReadBack1(ReadValues), + ReadBack2(ReadValues), ReadValues(ReadValues), Done, } @@ -117,6 +131,7 @@ pub struct EventsStreamScylla { scalar_type: ScalarType, shape: Shape, range: NanoRange, + ts_msps: VecDeque, scy: Arc, do_test_stream_error: bool, } @@ -137,10 +152,129 @@ impl EventsStreamScylla { scalar_type, shape, range: evq.range.clone(), + ts_msps: VecDeque::new(), scy, do_test_stream_error, } } + + fn ts_msps_found(&mut self, ts_msps: VecDeque) { + info!("found ts_msps {ts_msps:?}"); + self.ts_msps = ts_msps; + // Find the largest MSP which can potentially contain some event before the range. + let befores: Vec<_> = self + .ts_msps + .iter() + .map(|x| *x) + .filter(|x| *x < self.range.beg) + .collect(); + if befores.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + [befores[befores.len() - 1]].into(), + false, + self.scy.clone(), + ); + self.state = FrState::ReadBack1(st); + } else if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + } + + fn back_1_done(&mut self, item: Box) -> Option> { + info!("back_1_done len {}", item.len()); + if item.len() == 0 { + // Find the 2nd largest MSP which can potentially contain some event before the range. + let befores: Vec<_> = self + .ts_msps + .iter() + .map(|x| *x) + .filter(|x| *x < self.range.beg) + .collect(); + if befores.len() >= 2 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + [befores[befores.len() - 2]].into(), + false, + self.scy.clone(), + ); + self.state = FrState::ReadBack2(st); + None + } else if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + None + } else { + self.state = FrState::Done; + None + } + } else { + if self.ts_msps.len() > 0 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + Some(item) + } else { + self.state = FrState::Done; + Some(item) + } + } + } + + fn back_2_done(&mut self, item: Box) -> Option> { + info!("back_2_done len {}", item.len()); + if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + if item.len() > 0 { + Some(item) + } else { + None + } + } } impl Stream for EventsStreamScylla { @@ -162,23 +296,8 @@ impl Stream for EventsStreamScylla { continue; } FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(ts_msp)) => { - info!("found ts_msp {ts_msp:?}"); - // TODO get rid of into() for VecDeque - let mut st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - // TODO get rid of the conversion: - ts_msp.into(), - self.scy.clone(), - ); - if st.next() { - self.state = FrState::ReadValues(st); - } else { - self.state = FrState::Done; - } + Ready(Ok(ts_msps)) => { + self.ts_msps_found(ts_msps); continue; } Ready(Err(e)) => { @@ -187,10 +306,44 @@ impl Stream for EventsStreamScylla { } Pending => Pending, }, + FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + if let Some(item) = self.back_1_done(item) { + item.verify(); + item.output_info(); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } else { + continue; + } + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + if let Some(item) = self.back_2_done(item) { + item.verify(); + item.output_info(); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } else { + continue; + } + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Err(e))) + } + Pending => Pending, + }, FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { - if st.next() { - } else { + info!("read values"); + item.verify(); + item.output_info(); + if !st.next() { info!("ReadValues exhausted"); self.state = FrState::Done; } @@ -242,29 +395,28 @@ async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, Ok((series, scalar_type, shape)) } -async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { +async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { trace!("find_ts_msp series {} {:?}", series, range); // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1"; + let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; - let mut before = vec![]; + let mut before = VecDeque::new(); for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - before.push(row.0 as u64); + before.push_front(row.0 as u64); } - trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; let res = scy .query(cql, (series, range.beg as i64, range.end as i64)) .await .err_conv()?; - let mut ret = vec![]; - for x in before { - ret.push(x); + let mut ret = VecDeque::new(); + for h in before { + ret.push_back(h); } for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret.push(row.0 as u64); + ret.push_back(row.0 as u64); } trace!("found in total {} rows", ret.len()); Ok(ret) @@ -276,47 +428,65 @@ macro_rules! read_next_scalar_values { series: i64, ts_msp: u64, range: NanoRange, + fwd: bool, scy: Arc, ) -> Result, Error> { type ST = $st; type SCYTY = $scyty; - trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let _ts_lsp_max = if range.end <= ts_msp { - // TODO we should not be here... - } else { - }; + if ts_msp >= range.end { + warn!("given ts_msp {} >= range.end {}", ts_msp, range.end); + } if range.end > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } - let ts_lsp_max = range.end; - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ?" - ); - let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; + let res = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) + .await + .err_conv()? + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + info!( + "BCK ts_msp {} ts_lsp_max {} range beg {} end {} {}", + ts_msp, + ts_lsp_max, + range.beg, + range.end, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()? + }; let mut ret = ScalarEvents::::empty(); - let mut discarded = 0; for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; let pulse = row.1 as u64; let value = row.2 as ST; - if ts < range.beg || ts >= range.end { - discarded += 1; - } else { - ret.push(ts, pulse, value); - } + ret.push(ts, pulse, value); } - trace!( - "found in total {} events ts_msp {} discarded {}", - ret.tss.len(), - ts_msp, - discarded - ); + trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) } }; @@ -328,8 +498,12 @@ macro_rules! read_next_array_values { series: i64, ts_msp: u64, _range: NanoRange, + _fwd: bool, scy: Arc, ) -> Result, Error> { + if true { + return Err(Error::with_msg_no_trace("redo based on scalar case")); + } type ST = $st; type SCYTY = $scyty; info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); @@ -353,6 +527,8 @@ macro_rules! read_next_array_values { }; } +read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); +read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 9997af4..d24cd90 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -116,14 +116,14 @@ pub async fn search_channel_scylla( let mut res = vec![]; for row in rows { let series = row.get::<_, i64>(0) as u64; - let facility: String = row.get(1); + let backend: String = row.get(1); let channel: String = row.get(2); let a: i32 = row.get(3); let scalar_type = ScalarType::from_scylla_i32(a)?; let a: Vec = row.get(4); let shape = Shape::from_scylla_shape_dims(&a)?; let k = ChannelSearchSingleResult { - backend: facility, + backend, name: channel, series, source: "".into(), diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 38b9f07..2202c76 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -219,7 +219,10 @@ where // TODO unordered cases if lowest_ts < self.ts_last_emit { self.errored = true; - let msg = format!("unordered event at lowest_ts {}", lowest_ts); + let msg = format!( + "unordered event at lowest_ts {} ts_last_emit {}", + lowest_ts, self.ts_last_emit + ); return Ready(Some(Err(Error::with_public_msg(msg)))); } else { self.ts_last_emit = self.ts_last_emit.max(lowest_ts); @@ -230,6 +233,12 @@ where match &self.current[lowest_ix] { MergedCurVal::Val(val) => { let mut ldst = batch.unwrap_or_else(|| val.empty_like_self()); + if false { + info!( + "Push event rix {} lowest_ix {} lowest_ts {}", + rix, lowest_ix, lowest_ts + ); + } ldst.push_index(val, rix); self.batch = Some(ldst); } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 42aff04..4639d86 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,18 +1,21 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; -use dbconn::create_connection; +use dbconn::{create_connection, create_scylla_connection}; use disk::binned::query::PreBinnedQuery; use disk::events::PlainEventsQuery; +use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; use netpod::query::BinnedQuery; +use netpod::timeunits::*; use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::transport::iterator::NextRowError; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::convert::TryInto; @@ -196,6 +199,15 @@ impl ErrConv for Result { } } +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + impl ErrConv for Result { fn err_conv(self) -> Result { match self { @@ -322,11 +334,11 @@ impl ScyllaConfigsHisto { .build() .await .err_conv()?; - let facility = "scylla"; + let backend = "scylla"; let res = scy .query( "select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering", - (facility,), + (backend,), ) .await .err_conv()?; @@ -441,11 +453,11 @@ impl ScyllaChannelsWithType { .build() .await .err_conv()?; - let facility = "scylla"; + let backend = "scylla"; let res = scy .query( "select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering", - (facility, q.scalar_type.to_scylla_i32(), q.shape.to_scylla_vec()), + (backend, q.scalar_type.to_scylla_i32(), q.shape.to_scylla_vec()), ) .await .err_conv()?; @@ -453,7 +465,7 @@ impl ScyllaChannelsWithType { for row in res.rows_typed_or_empty::<(String, i64)>() { let (channel_name, series) = row.err_conv()?; let ch = Channel { - backend: facility.into(), + backend: backend.into(), name: channel_name, series: Some(series as u64), }; @@ -466,9 +478,9 @@ impl ScyllaChannelsWithType { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScyllaChannelEventSeriesIdQuery { - facility: String, + backend: String, #[serde(rename = "channelName")] - channel_name: String, + name: String, #[serde(rename = "scalarType")] scalar_type: ScalarType, shape: Shape, @@ -487,11 +499,11 @@ impl FromUrl for ScyllaChannelEventSeriesIdQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let facility = pairs - .get("facility") - .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? + let backend = pairs + .get("backend") + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .into(); - let channel_name = pairs + let name = pairs .get("channelName") .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? .into(); @@ -505,8 +517,8 @@ impl FromUrl for ScyllaChannelEventSeriesIdQuery { let shape = Shape::from_dims_str(s)?; let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true"; Ok(Self { - facility, - channel_name, + backend, + name, scalar_type, shape, do_create, @@ -521,7 +533,7 @@ pub struct ScyllaChannelEventSeriesIdResponse { } /** -Get the series-id for a channel identified by facility, channel name, scalar type, shape. +Get the series-id for a channel identified by backend, channel name, scalar type, shape. */ pub struct ScyllaChannelEventSeriesId {} @@ -581,7 +593,7 @@ impl ScyllaChannelEventSeriesId { let res = pg_client .query( "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", - &[&q.facility, &q.channel_name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec()], + &[&q.backend, &q.name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec()], ) .await .err_conv()?; @@ -592,16 +604,13 @@ impl ScyllaChannelEventSeriesId { let ret = ScyllaChannelEventSeriesIdResponse { series }; Ok(ret) } else if q.do_create == false { - return Err(Error::with_msg_no_trace(format!( - "series id not found for {}", - q.channel_name - ))); + return Err(Error::with_msg_no_trace(format!("series id not found for {}", q.name))); } else { let tsbeg = Instant::now(); use md5::Digest; let mut h = md5::Md5::new(); - h.update(q.facility.as_bytes()); - h.update(q.channel_name.as_bytes()); + h.update(q.backend.as_bytes()); + h.update(q.name.as_bytes()); h.update(format!("{:?}", q.scalar_type).as_bytes()); h.update(format!("{:?}", q.shape).as_bytes()); for _ in 0..200 { @@ -628,8 +637,8 @@ impl ScyllaChannelEventSeriesId { ), &[ &(series as i64), - &q.facility, - &q.channel_name, + &q.backend, + &q.name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec(), ], @@ -642,18 +651,18 @@ impl ScyllaChannelEventSeriesId { } else { warn!( "tried to insert {series:?} for {} {} {:?} {:?} trying again...", - q.facility, q.channel_name, q.scalar_type, q.shape + q.backend, q.name, q.scalar_type, q.shape ); } tokio::time::sleep(Duration::from_millis(20)).await; } error!( "tried to insert new series id for {} {} {:?} {:?} but failed", - q.facility, q.channel_name, q.scalar_type, q.shape + q.backend, q.name, q.scalar_type, q.shape ); Err(Error::with_msg_no_trace(format!( "get_series_id can not create and insert series id {:?} {:?} {:?} {:?}", - q.facility, q.channel_name, q.scalar_type, q.shape + q.backend, q.name, q.scalar_type, q.shape ))) } } @@ -762,7 +771,6 @@ impl ScyllaChannelsActive { ) .await .err_conv()?; - use futures_util::StreamExt; while let Some(row) = res.next().await { let row = row.err_conv()?; let (series,): (i64,) = row.into_typed().err_conv()?; @@ -796,7 +804,7 @@ impl FromUrl for ChannelFromSeriesQuery { #[derive(Clone, Debug, Serialize)] pub struct ChannelFromSeriesResponse { - facility: String, + backend: String, #[serde(rename = "channelName")] channel: String, #[serde(rename = "scalarType")] @@ -881,7 +889,7 @@ impl ChannelFromSeries { // TODO return code 204 return Err(Error::with_msg_no_trace("can not find series")); }; - let facility: String = res.get(0); + let backend: String = res.get(0); let channel: String = res.get(1); let scalar_type: i32 = res.get(2); // TODO check and document the format in the storage: @@ -891,7 +899,7 @@ impl ChannelFromSeries { let agg_kind: i32 = res.get(4); // TODO method is called from_scylla_shape_dims but document that postgres uses the same format. let ret = ChannelFromSeriesResponse { - facility, + backend, channel, scalar_type, shape, @@ -903,9 +911,9 @@ impl ChannelFromSeries { #[derive(Clone, Debug, Deserialize)] pub struct IocForChannelQuery { - facility: String, + backend: String, #[serde(rename = "channelName")] - channel_name: String, + name: String, } impl FromUrl for IocForChannelQuery { @@ -915,15 +923,15 @@ impl FromUrl for IocForChannelQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let facility = pairs - .get("facility") - .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? + let backend = pairs + .get("backend") + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .into(); - let channel_name = pairs + let name = pairs .get("channelName") .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? .into(); - Ok(Self { facility, channel_name }) + Ok(Self { backend, name }) } } @@ -982,7 +990,7 @@ impl IocForChannel { let rows = pg_client .query( "select addr from ioc_by_channel where facility = $1 and channel = $2", - &[&q.facility, &q.channel_name], + &[&q.backend, &q.name], ) .await?; if let Some(row) = rows.first() { @@ -1082,7 +1090,6 @@ impl ScyllaSeriesTsMsp { .query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,)) .await .err_conv()?; - use futures_util::StreamExt; while let Some(row) = res.next().await { let row = row.err_conv()?; let (ts_msp,): (i64,) = row.into_typed().err_conv()?; @@ -1163,3 +1170,126 @@ impl AmbigiousChannelNames { Ok(ret) } } + +struct TestData01Iter {} + +impl Iterator for TestData01Iter { + type Item = f64; + + fn next(&mut self) -> Option { + None + } +} + +struct Msps(Vec); +struct Lsps(Vec); +struct Pulses(Vec); +struct ValsF64(Vec); + +fn test_data_f64_01() -> (Msps, Lsps, Pulses, ValsF64) { + let mut msps = Msps(Vec::new()); + let mut lsps = Lsps(Vec::new()); + let mut pulses = Pulses(Vec::new()); + let mut vals = ValsF64(Vec::new()); + let mut msp = 0; + let mut i1 = 0; + for i in 0..2000 { + let ts = SEC * 1600000000 + MIN * 2 * i; + let pulse = 10000 + i; + if msp == 0 || i1 >= 40 { + msp = ts / MIN * MIN; + i1 = 0; + } + msps.0.push(msp); + lsps.0.push(ts - msp); + pulses.0.push(pulse); + vals.0.push(pulse as f64 + 0.4 + 0.2 * (pulse as f64).sin()); + i1 += 1; + } + (msps, lsps, pulses, vals) +} + +pub struct GenerateScyllaTestData {} + +impl GenerateScyllaTestData { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/test/generate/scylla" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + match self.process(node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?), + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> { + let dbconf = &node_config.node_config.cluster.database; + let _pg_client = create_connection(dbconf).await?; + let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap(); + let scy = create_scylla_connection(scyconf).await?; + let series: u64 = 42001; + // TODO query `ts_msp` for all MSP values und use that to delete from event table first. + // Only later delete also from the `ts_msp` table. + let it = scy + .query_iter("select ts_msp from ts_msp where series = ?", (series as i64,)) + .await + .err_conv()?; + let mut it = it.into_typed::<(i64,)>(); + while let Some(row) = it.next().await { + let row = row.err_conv()?; + let values = (series as i64, row.0); + scy.query("delete from events_scalar_f64 where series = ? and ts_msp = ?", values) + .await + .err_conv()?; + } + scy.query("delete from ts_msp where series = ?", (series as i64,)) + .await + .err_conv()?; + + // Generate + let (msps, lsps, pulses, vals) = test_data_f64_01(); + let mut last = 0; + for msp in msps.0.iter().map(|x| *x) { + if msp != last { + scy.query( + "insert into ts_msp (series, ts_msp) values (?, ?)", + (series as i64, msp as i64), + ) + .await + .err_conv()?; + } + last = msp; + } + for (((msp, lsp), pulse), val) in msps.0.into_iter().zip(lsps.0).zip(pulses.0).zip(vals.0) { + scy.query( + "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)", + (series as i64, msp as i64, lsp as i64, pulse as i64, val), + ) + .await + .err_conv()?; + } + Ok(()) + } +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 16d4ff4..4a91b42 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -237,6 +237,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::GenerateScyllaTestData::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index 82908f7..eb9fec4 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -367,6 +367,8 @@ pub struct MinMaxAvgDim0BinsAggregator { count: u64, min: NTY, max: NTY, + // Carry over to next bin: + avg: f32, sumc: u64, sum: f32, } @@ -378,6 +380,7 @@ impl MinMaxAvgDim0BinsAggregator { count: 0, min: NTY::zero(), max: NTY::zero(), + avg: 0., sumc: 0, sum: 0f32, } @@ -401,14 +404,17 @@ where } else if item.ts2s[i1] <= self.range.beg { } else if item.ts1s[i1] >= self.range.end { } else { + if item.mins[i1].as_prim_f32() < 1. { + info!("small bin min {:?} counts {}", item.mins[i1], item.counts[i1]); + } if self.count == 0 { self.min = item.mins[i1].clone(); self.max = item.maxs[i1].clone(); } else { - if item.mins[i1] < self.min { + if self.min > item.mins[i1] { self.min = item.mins[i1].clone(); } - if item.maxs[i1] > self.max { + if self.max < item.maxs[i1] { self.max = item.maxs[i1].clone(); } } @@ -420,23 +426,19 @@ where } fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { - let avg = if self.sumc == 0 { - 0f32 - } else { - self.sum / self.sumc as f32 - }; + if self.sumc > 0 { + self.avg = self.sum / self.sumc as f32; + } let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min.clone()], maxs: vec![self.max.clone()], - avgs: vec![avg], + avgs: vec![self.avg], }; - self.count = 0; - self.min = NTY::zero(); - self.max = NTY::zero(); self.range = range; + self.count = 0; self.sum = 0f32; self.sumc = 0; ret @@ -459,86 +461,35 @@ impl TimeBinnableDyn for MinMaxAvgDim0Bins { pub struct MinMaxAvgDim0BinsTimeBinner { edges: VecDeque, do_time_weight: bool, - range: NanoRange, agg: Option>, ready: Option< as TimeBinnableTypeAggregator>::Output>, } impl MinMaxAvgDim0BinsTimeBinner { fn new(edges: VecDeque, do_time_weight: bool) -> Self { - let range = if edges.len() >= 2 { - NanoRange { - beg: edges[0], - end: edges[1], - } - } else { - // Using a dummy for this case. - NanoRange { beg: 1, end: 2 } - }; Self { edges, do_time_weight, - range, agg: None, ready: None, } } - // Move the bin from the current aggregator (if any) to our output collection, - // and step forward in our bin list. - fn cycle(&mut self) { - eprintln!("cycle"); - // TODO where to take expand from? Is it still required after all? - let expand = true; - let have_next_bin = self.edges.len() >= 3; - let range_next = if have_next_bin { - NanoRange { - beg: self.edges[1], - end: self.edges[2], - } + fn next_bin_range(&mut self) -> Option { + if self.edges.len() >= 2 { + let ret = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + self.edges.pop_front(); + Some(ret) } else { - // Using a dummy for this case. - NanoRange { beg: 1, end: 2 } - }; - if let Some(agg) = self.agg.as_mut() { - eprintln!("cycle: use existing agg: {:?}", agg.range); - let mut h = agg.result_reset(range_next.clone(), expand); - match self.ready.as_mut() { - Some(fin) => { - fin.append(&mut h); - } - None => { - self.ready = Some(h); - } - } - } else if have_next_bin { - eprintln!("cycle: append a zero bin"); - let mut h = MinMaxAvgDim0Bins::::empty(); - h.append_zero(self.range.beg, self.range.end); - match self.ready.as_mut() { - Some(fin) => { - fin.append(&mut h); - } - None => { - self.ready = Some(h); - } - } - } else { - eprintln!("cycle: no more next bin"); - } - self.range = range_next; - self.edges.pop_front(); - if !have_next_bin { - self.agg = None; + None } } } impl TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner { - fn cycle(&mut self) { - Self::cycle(self) - } - fn ingest(&mut self, item: &dyn TimeBinnableDyn) { const SELF: &str = "MinMaxAvgDim0BinsTimeBinner"; if item.len() == 0 { @@ -552,37 +503,48 @@ impl TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner { // TODO optimize by remembering at which event array index we have arrived. // That needs modified interfaces which can take and yield the start and latest index. loop { - while item.starts_after(self.range.clone()) { + while item.starts_after(NanoRange { + beg: 0, + end: self.edges[1], + }) { self.cycle(); if self.edges.len() < 2 { warn!("TimeBinnerDyn for {SELF} no more bin in edges B"); return; } } - if item.ends_before(self.range.clone()) { + if item.ends_before(NanoRange { + beg: self.edges[0], + end: u64::MAX, + }) { return; } else { if self.edges.len() < 2 { warn!("TimeBinnerDyn for {SELF} edge list exhausted"); return; } else { - if self.agg.is_none() { + let agg = if let Some(agg) = self.agg.as_mut() { + agg + } else { self.agg = Some(MinMaxAvgDim0BinsAggregator::new( - self.range.clone(), + // We know here that we have enough edges for another bin. + // and `next_bin_range` will pop the first edge. + self.next_bin_range().unwrap(), self.do_time_weight, )); - } - let agg = self.agg.as_mut().unwrap(); - if let Some(item) = - item.as_any() - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + self.agg.as_mut().unwrap() + }; + if let Some(item) = item + .as_any() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { agg.ingest(item); } else { let tyid_item = std::any::Any::type_id(item.as_any()); error!("not correct item type {:?}", tyid_item); }; - if item.ends_after(self.range.clone()) { + if item.ends_after(agg.range().clone()) { self.cycle(); if self.edges.len() < 2 { warn!("TimeBinnerDyn for {SELF} no more bin in edges C"); @@ -609,6 +571,53 @@ impl TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner { None => None, } } + + // TODO there is too much common code between implementors: + fn push_in_progress(&mut self, push_empty: bool) { + // TODO expand should be derived from AggKind. Is it still required after all? + let expand = true; + if let Some(agg) = self.agg.as_mut() { + let dummy_range = NanoRange { beg: 4, end: 5 }; + let mut bins = agg.result_reset(dummy_range, expand); + self.agg = None; + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(ready) => { + ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + // TODO there is too much common code between implementors: + fn cycle(&mut self) { + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(range) = self.next_bin_range() { + let mut bins = MinMaxAvgDim0Bins::::empty(); + bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(ready) => { + ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } } impl TimeBinned for MinMaxAvgDim0Bins { @@ -635,4 +644,22 @@ impl TimeBinned for MinMaxAvgDim0Bins { fn avgs(&self) -> Vec { self.avgs.clone() } + + fn validate(&self) -> Result<(), String> { + use std::fmt::Write; + let mut msg = String::new(); + if self.ts1s.len() != self.ts2s.len() { + write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); + } + for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { + if min.as_prim_f32() < 1. && *count != 0 { + write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); + } + } + if msg.is_empty() { + Ok(()) + } else { + Err(msg) + } + } } diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index 7528095..5247665 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -594,4 +594,8 @@ impl TimeBinned for MinMaxAvgDim1Bins { fn maxs(&self) -> Vec { err::todoval() } + + fn validate(&self) -> Result<(), String> { + err::todoval() + } } diff --git a/items/src/frame.rs b/items/src/frame.rs index a21a021..6de3723 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -1,7 +1,7 @@ use crate::inmem::InMemoryFrame; use crate::{ - FrameType, FrameTypeStatic, ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC, - TERM_FRAME_TYPE_ID, + FrameType, FrameTypeStatic, Sitemty, StreamItem, ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, + INMEM_FRAME_MAGIC, NON_DATA_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID, }; use bytes::{BufMut, BytesMut}; use err::Error; @@ -144,6 +144,40 @@ where } }; Ok(T::from_error(k)) + } else if frame.tyid() == NON_DATA_FRAME_TYPE_ID { + error!("TODO NON_DATA_FRAME_TYPE_ID"); + type TT = Sitemty>; + let _k: TT = match bincode::deserialize::(frame.buf()) { + Ok(item) => match item { + Ok(StreamItem::DataItem(_)) => { + error!( + "ERROR bincode::deserialize len {} NON_DATA_FRAME_TYPE_ID but found Ok(StreamItem::DataItem)", + frame.buf().len() + ); + let n = frame.buf().len().min(64); + let s = String::from_utf8_lossy(&frame.buf()[..n]); + error!("frame.buf as string: {:?}", s); + Err(Error::with_msg_no_trace("NON_DATA_FRAME_TYPE_ID decode error"))? + } + Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), + Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), + Err(e) => { + error!("decode_frame sees error: {e:?}"); + Err(e) + } + }, + Err(e) => { + error!( + "ERROR bincode::deserialize len {} ERROR_FRAME_TYPE_ID", + frame.buf().len() + ); + let n = frame.buf().len().min(64); + let s = String::from_utf8_lossy(&frame.buf()[..n]); + error!("frame.buf as string: {:?}", s); + Err(e)? + } + }; + Err(Error::with_msg_no_trace("TODO NON_DATA_FRAME_TYPE_ID")) } else { let tyid = T::FRAME_TYPE_ID; if frame.tyid() != tyid { diff --git a/items/src/items.rs b/items/src/items.rs index 7bec7f1..80a3085 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -525,6 +525,8 @@ pub trait TimeBinnableDynAggregator: Send { /// Container of some form of events, for use as trait object. pub trait EventsDyn: TimeBinnableDyn { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; + fn verify(&self); + fn output_info(&self); } /// Data in time-binned form. @@ -535,6 +537,7 @@ pub trait TimeBinned: TimeBinnableDyn { fn mins(&self) -> Vec; fn maxs(&self) -> Vec; fn avgs(&self) -> Vec; + fn validate(&self) -> Result<(), String>; } impl WithLen for Box { @@ -662,6 +665,8 @@ pub trait TimeBinnableTypeAggregator: Send { type Output: TimeBinnableType; fn range(&self) -> &NanoRange; fn ingest(&mut self, item: &Self::Input); + // TODO this API is too convoluted for a minimal performance gain: should separate `result` and `reset` + // or simply require to construct a new which is almost equally expensive. fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; } @@ -690,8 +695,12 @@ pub trait TimeBinnerDyn: Send { fn bins_ready(&mut self) -> Option>; fn ingest(&mut self, item: &dyn TimeBinnableDyn); - /// Caller indicates that there will be no more data for the current bin. - /// Implementor is expected to prepare processing the next bin. + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. /// The next call to `Self::bins_ready_count` must return one higher count than before. fn cycle(&mut self); } @@ -777,7 +786,7 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK #[test] fn bin_binned_01() { use binsdim0::MinMaxAvgDim0Bins; - let edges = vec![SEC * 1000, SEC * 1010, SEC * 1020]; + let edges = vec![SEC * 1000, SEC * 1010, SEC * 1020, SEC * 1030]; let inp0 = as NewEmpty>::empty(Shape::Scalar); let mut time_binner = inp0.time_binner_new(edges, true); let inp1 = MinMaxAvgDim0Bins:: { @@ -791,12 +800,52 @@ fn bin_binned_01() { assert_eq!(time_binner.bins_ready_count(), 0); time_binner.ingest(&inp1); assert_eq!(time_binner.bins_ready_count(), 1); - time_binner.cycle(); + time_binner.push_in_progress(false); assert_eq!(time_binner.bins_ready_count(), 2); + // From here on, pushing any more should not change the bin count: + time_binner.push_in_progress(false); + assert_eq!(time_binner.bins_ready_count(), 2); + // On the other hand, cycling should add one more zero-bin: + time_binner.cycle(); + assert_eq!(time_binner.bins_ready_count(), 3); + time_binner.cycle(); + assert_eq!(time_binner.bins_ready_count(), 3); + let bins = time_binner.bins_ready().expect("bins should be ready"); + eprintln!("bins: {:?}", bins); + assert_eq!(time_binner.bins_ready_count(), 0); + assert_eq!(bins.counts(), &[1, 1, 0]); + // TODO use proper float-compare logic: + assert_eq!(bins.mins(), &[3., 4., 0.]); + assert_eq!(bins.maxs(), &[10., 9., 0.]); + assert_eq!(bins.avgs(), &[7., 6., 0.]); +} + +#[test] +fn bin_binned_02() { + use binsdim0::MinMaxAvgDim0Bins; + let edges = vec![SEC * 1000, SEC * 1020]; + let inp0 = as NewEmpty>::empty(Shape::Scalar); + let mut time_binner = inp0.time_binner_new(edges, true); + let inp1 = MinMaxAvgDim0Bins:: { + ts1s: vec![SEC * 1000, SEC * 1010], + ts2s: vec![SEC * 1010, SEC * 1020], + counts: vec![1, 1], + mins: vec![3, 4], + maxs: vec![10, 9], + avgs: vec![7., 6.], + }; + assert_eq!(time_binner.bins_ready_count(), 0); + time_binner.ingest(&inp1); + assert_eq!(time_binner.bins_ready_count(), 0); + time_binner.cycle(); + assert_eq!(time_binner.bins_ready_count(), 1); time_binner.cycle(); //assert_eq!(time_binner.bins_ready_count(), 2); let bins = time_binner.bins_ready().expect("bins should be ready"); eprintln!("bins: {:?}", bins); - assert_eq!(bins.counts().len(), 2); assert_eq!(time_binner.bins_ready_count(), 0); + assert_eq!(bins.counts(), &[2]); + assert_eq!(bins.mins(), &[3.]); + assert_eq!(bins.maxs(), &[10.]); + assert_eq!(bins.avgs(), &[13. / 2.]); } diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index 29744f0..3868be5 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -369,9 +369,10 @@ pub struct EventValuesAggregator { impl Drop for EventValuesAggregator { fn drop(&mut self) { // TODO collect as stats for the request context: - warn!( + trace!( "taken {} ignored {}", - self.events_taken_count, self.events_ignored_count + self.events_taken_count, + self.events_ignored_count ); } } @@ -387,7 +388,7 @@ where count: 0, min: NTY::zero(), max: NTY::zero(), - sum: 0f32, + sum: 0., sumc: 0, int_ts, last_ts: 0, @@ -402,13 +403,13 @@ where fn apply_min_max(&mut self, val: NTY) { if self.count == 0 { self.min = val.clone(); - self.max = val; + self.max = val.clone(); } else { - if val < self.min { + if self.min > val { self.min = val.clone(); } - if val > self.max { - self.max = val; + if self.max < val { + self.max = val.clone(); } } } @@ -469,6 +470,12 @@ where let ts = item.tss[i1]; let val = item.values[i1].clone(); if ts < self.int_ts { + if self.last_val.is_none() { + info!( + "ingest_time_weight event before range, only set last ts {} val {:?}", + ts, val + ); + } self.events_ignored_count += 1; self.last_ts = ts; self.last_val = Some(val); @@ -476,8 +483,14 @@ where self.events_ignored_count += 1; return; } else { - debug!("regular"); self.apply_event_time_weight(ts); + if self.last_val.is_none() { + info!( + "call apply_min_max without last val, use current instead {} {:?}", + ts, val + ); + self.apply_min_max(val.clone()); + } self.count += 1; self.last_ts = ts; self.last_val = Some(val); @@ -487,24 +500,27 @@ where } fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim0Bins { - let avg = if self.sumc == 0 { - 0f32 + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / self.sumc as f32; + (self.min.clone(), self.max.clone(), avg) } else { - self.sum / self.sumc as f32 + let g = match &self.last_val { + Some(x) => x.clone(), + None => NTY::zero(), + }; + (g.clone(), g.clone(), g.as_prim_f32()) }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min.clone()], - maxs: vec![self.max.clone()], + mins: vec![min], + maxs: vec![max], avgs: vec![avg], }; self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = NTY::zero(); - self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret @@ -512,29 +528,33 @@ where fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgDim0Bins { // TODO check callsite for correct expand status. - if true || expand { + if expand { debug!("result_reset_time_weight calls apply_event_time_weight"); self.apply_event_time_weight(self.range.end); } else { debug!("result_reset_time_weight NO EXPAND"); } - let avg = { - let sc = self.range.delta() as f32 * 1e-9; - self.sum / sc + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / (self.range.delta() as f32 * 1e-9); + (self.min.clone(), self.max.clone(), avg) + } else { + let g = match &self.last_val { + Some(x) => x.clone(), + None => NTY::zero(), + }; + (g.clone(), g.clone(), g.as_prim_f32()) }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], - mins: vec![self.min.clone()], - maxs: vec![self.max.clone()], + mins: vec![min], + maxs: vec![max], avgs: vec![avg], }; self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = NTY::zero(); - self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret @@ -586,8 +606,6 @@ where impl TimeBinnableDyn for ScalarEvents { fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - eprintln!("ScalarEvents time_binner_new"); - info!("ScalarEvents time_binner_new"); let ret = ScalarEventsTimeBinner::::new(edges.into(), do_time_weight); Box::new(ret) } @@ -601,134 +619,74 @@ impl EventsDyn for ScalarEvents { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { self as &dyn TimeBinnableDyn } + + fn verify(&self) { + let mut ts_max = 0; + for ts in &self.tss { + let ts = *ts; + if ts < ts_max { + error!("unordered event data ts {} ts_max {}", ts, ts_max); + } + ts_max = ts_max.max(ts); + } + } + + fn output_info(&self) { + if false { + info!("output_info len {}", self.tss.len()); + if self.tss.len() == 1 { + info!( + " only: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + } else if self.tss.len() > 1 { + info!( + " first: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + let n = self.tss.len() - 1; + info!( + " last: ts {} pulse {} value {:?}", + self.tss[n], self.pulses[n], self.values[n] + ); + } + } + } } pub struct ScalarEventsTimeBinner { + // The first two edges are used the next time that we create an aggregator, or push a zero bin. edges: VecDeque, do_time_weight: bool, - range: NanoRange, agg: Option>, ready: Option< as TimeBinnableTypeAggregator>::Output>, } impl ScalarEventsTimeBinner { fn new(edges: VecDeque, do_time_weight: bool) -> Self { - let range = if edges.len() >= 2 { - NanoRange { - beg: edges[0], - end: edges[1], - } - } else { - // Using a dummy for this case. - NanoRange { beg: 1, end: 2 } - }; Self { edges, do_time_weight, - range, agg: None, ready: None, } } - // Move the bin from the current aggregator (if any) to our output collection, - // and step forward in our bin list. - fn cycle(&mut self) { - // TODO expand should be derived from AggKind. Is it still required after all? - let expand = true; - if let Some(agg) = self.agg.as_mut() { - let mut h = agg.result_reset(self.range.clone(), expand); - match self.ready.as_mut() { - Some(fin) => { - fin.append(&mut h); - } - None => { - self.ready = Some(h); - } - } - } else { - let mut h = MinMaxAvgDim0Bins::::empty(); - h.append_zero(self.range.beg, self.range.end); - match self.ready.as_mut() { - Some(fin) => { - fin.append(&mut h); - } - None => { - self.ready = Some(h); - } - } - } - self.edges.pop_front(); + fn next_bin_range(&mut self) -> Option { if self.edges.len() >= 2 { - self.range = NanoRange { + let ret = NanoRange { beg: self.edges[0], end: self.edges[1], }; + self.edges.pop_front(); + Some(ret) } else { - // Using a dummy for this case. - self.range = NanoRange { beg: 1, end: 2 }; + None } } } impl TimeBinnerDyn for ScalarEventsTimeBinner { - fn cycle(&mut self) { - Self::cycle(self) - } - - fn ingest(&mut self, item: &dyn TimeBinnableDyn) { - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges A"); - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - loop { - while item.starts_after(self.range.clone()) { - self.cycle(); - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges B"); - return; - } - } - if item.ends_before(self.range.clone()) { - return; - } else { - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for ScalarEventsTimeBinner edge list exhausted"); - return; - } else { - if self.agg.is_none() { - self.agg = Some(EventValuesAggregator::new(self.range.clone(), self.do_time_weight)); - } - let agg = self.agg.as_mut().unwrap(); - if let Some(item) = item - .as_any() - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() - { - // TODO collect statistics associated with this request: - agg.ingest(item); - } else { - error!("not correct item type"); - }; - if item.ends_after(self.range.clone()) { - self.cycle(); - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges C"); - return; - } - } else { - break; - } - } - } - } - } - fn bins_ready_count(&self) -> usize { match &self.ready { Some(k) => k.len(), @@ -742,4 +700,133 @@ impl TimeBinnerDyn for ScalarEventsTimeBinner { None => None, } } + + fn ingest(&mut self, item: &dyn TimeBinnableDyn) { + const SELF: &str = "ScalarEventsTimeBinner"; + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(NanoRange { + beg: 0, + end: self.edges[1], + }) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges B"); + return; + } + } + if item.ends_before(NanoRange { + beg: self.edges[0], + end: u64::MAX, + }) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} edge list exhausted"); + return; + } else { + let agg = if let Some(agg) = self.agg.as_mut() { + agg + } else { + self.agg = Some(EventValuesAggregator::new( + // We know here that we have enough edges for another bin. + // and `next_bin_range` will pop the first edge. + self.next_bin_range().unwrap(), + self.do_time_weight, + )); + self.agg.as_mut().unwrap() + }; + if let Some(item) = item + .as_any() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + // TODO collect statistics associated with this request: + agg.ingest(item); + } else { + error!("not correct item type"); + }; + if item.ends_after(agg.range().clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges C"); + return; + } + } else { + break; + } + } + } + } + } + + fn push_in_progress(&mut self, push_empty: bool) { + // TODO expand should be derived from AggKind. Is it still required after all? + // TODO here, the expand means that agg will assume that the current value is kept constant during + // the rest of the time range. + let expand = true; + let range_next = if self.agg.is_some() { + if let Some(x) = self.next_bin_range() { + Some(x) + } else { + None + } + } else { + None + }; + if let Some(agg) = self.agg.as_mut() { + let mut bins; + if let Some(range_next) = range_next { + bins = agg.result_reset(range_next, expand); + } else { + let range_next = NanoRange { beg: 4, end: 5 }; + bins = agg.result_reset(range_next, expand); + self.agg = None; + } + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(ready) => { + ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + fn cycle(&mut self) { + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(range) = self.next_bin_range() { + let mut bins = MinMaxAvgDim0Bins::::empty(); + bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(ready) => { + ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index d83415e..ece9a0f 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -531,4 +531,12 @@ impl EventsDyn for WaveEvents { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { self as &dyn TimeBinnableDyn } + + fn verify(&self) { + todo!() + } + + fn output_info(&self) { + todo!() + } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 10e33d3..baf9501 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -660,7 +660,7 @@ impl fmt::Debug for Nanos { } } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, PartialEq)] pub struct NanoRange { pub beg: u64, pub end: u64, @@ -1031,14 +1031,6 @@ const BIN_T_LEN_OPTIONS_0: [u64; 3] = [ DAY, ]; -const PATCH_T_LEN_KEY: [u64; 3] = [ - // - //SEC, - MIN * 1, - HOUR * 1, - DAY, -]; - const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [ // //MIN * 60, @@ -1108,7 +1100,7 @@ impl PreBinnedPatchGridSpec { } pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in PATCH_T_LEN_KEY.iter() { + for &j in BIN_T_LEN_OPTIONS_0.iter() { if bin_t_len == j { return true; } @@ -1142,21 +1134,21 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { let shape = Shape::Scalar; match shape { Shape::Scalar => { - for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { + for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_SCALAR[i1]; } } } Shape::Wave(..) => { - for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { + for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_WAVE[i1]; } } } Shape::Image(..) => { - for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { + for (i1, &j) in BIN_T_LEN_OPTIONS_0.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_WAVE[i1]; } @@ -1221,9 +1213,21 @@ impl PreBinnedPatchRange { t += bin_len; ret.push(t); } + if ret.len() as u64 != self.bin_count() + 1 { + error!("edges() yields wrong number {} vs {}", ret.len(), self.bin_count()); + panic!(); + } ret } + pub fn range(&self) -> NanoRange { + let pl = self.grid_spec.patch_t_len; + NanoRange { + beg: pl * self.offset, + end: pl * (self.offset + self.count), + } + } + pub fn patch_count(&self) -> u64 { self.count } @@ -1352,7 +1356,7 @@ impl BinnedGridSpec { } pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in PATCH_T_LEN_KEY.iter() { + for &j in BIN_T_LEN_OPTIONS_0.iter() { if bin_t_len == j { return true; } @@ -1850,6 +1854,7 @@ mod test { pub struct ChannelSearchSingleResult { pub backend: String, pub name: String, + #[serde(rename = "seriesId")] pub series: u64, pub source: String, #[serde(rename = "type")]