From d0a7240934c176a675e335de64841a928eaf5337 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 6 Jul 2022 15:51:05 +0200 Subject: [PATCH] Refactor and prepare for scylla based bin caching --- daqbufp2/src/test/events.rs | 2 +- dbconn/src/bincache.rs | 389 ++++++++++++++++++++++++------- dbconn/src/events_scylla.rs | 52 +++-- disk/src/binned/prebinned.rs | 12 +- disk/src/binned/query.rs | 9 +- disk/src/eventchunker.rs | 5 + disk/src/events.rs | 142 +++++------ httpret/src/channelconfig.rs | 186 ++++++++++++--- httpret/src/download.rs | 4 + httpret/src/events.rs | 16 +- httpret/src/evinfo.rs | 1 + httpret/src/httpret.rs | 11 +- httpret/src/pulsemap.rs | 6 + items/src/binnedevents.rs | 15 ++ items/src/binsdim0.rs | 308 ++++++++++++++++++------ items/src/binsdim1.rs | 56 +++-- items/src/eventsitem.rs | 6 + items/src/frame.rs | 45 ++-- items/src/items.rs | 279 ++++++++++++++++++---- items/src/numops.rs | 1 + items/src/plainevents.rs | 15 ++ items/src/scalarevents.rs | 257 +++++++++++++++++--- items/src/statsevents.rs | 20 +- items/src/waveevents.rs | 30 ++- items/src/xbinnedscalarevents.rs | 75 +++--- items/src/xbinnedwaveevents.rs | 24 +- netpod/src/netpod.rs | 200 ++++++++-------- netpod/src/query.rs | 21 +- nodenet/src/conn.rs | 2 +- 29 files changed, 1647 insertions(+), 542 deletions(-) diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index c8d3372..1dd3884 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -12,7 +12,7 @@ use items::numops::NumOps; use items::scalarevents::ScalarEvents; use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; use netpod::log::*; -use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; +use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs index e6531c4..a383c8b 100644 --- a/dbconn/src/bincache.rs +++ b/dbconn/src/bincache.rs @@ -2,28 +2,109 @@ use crate::events_scylla::EventsStreamScylla; use crate::ErrConv; use err::Error; use futures_util::{Future, Stream, StreamExt}; -use items::{TimeBinnableDyn, TimeBinned}; +use items::binsdim0::MinMaxAvgDim0Bins; +use items::{empty_binned_dyn, empty_events_dyn, RangeCompletableItem, StreamItem, TimeBinned}; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::{CacheUsage, RawEventsQuery}; +use netpod::timeunits::*; use netpod::{ - AggKind, ChannelTyped, NanoRange, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScyllaConfig, + AggKind, ChannelTyped, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, ScyllaConfig, + Shape, }; use scylla::Session as ScySession; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; pub async fn read_cached_scylla( + series: u64, chn: &ChannelTyped, coord: &PreBinnedPatchCoord, + _agg_kind: AggKind, scy: &ScySession, ) -> Result>, Error> { - let _ = coord; - let series = chn.series_id()?; - let res = scy.query_iter("", (series as i64,)).await.err_conv()?; - let _ = res; - // TODO look for the data. Based on the ChannelTyped we know what type the caller expects. - err::todoval() + let vals = ( + series as i64, + (coord.bin_t_len() / SEC) as i32, + (coord.patch_t_len() / SEC) as i32, + coord.ix() as i64, + ); + let res = scy + .query_iter( + "select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?", + vals, + ) + .await; + let mut res = res.err_conv().map_err(|e| { + error!("can not read from cache"); + e + })?; + while let Some(item) = res.next().await { + let row = item.err_conv()?; + let edges = coord.edges(); + let (counts, avgs, mins, maxs): (Vec, Vec, Vec, Vec) = row.into_typed().err_conv()?; + let mut counts_mismatch = false; + if edges.len() != counts.len() + 1 { + counts_mismatch = true; + } + if counts.len() != avgs.len() { + counts_mismatch = true; + } + let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec(); + let ts2s = edges[1.min(edges.len())..].to_vec(); + if ts1s.len() != ts2s.len() { + error!("ts1s vs ts2s mismatch"); + counts_mismatch = true; + } + if ts1s.len() != counts.len() { + counts_mismatch = true; + } + let avgs = avgs.into_iter().map(|x| x).collect::>(); + let mins = mins.into_iter().map(|x| x as _).collect::>(); + let maxs = maxs.into_iter().map(|x| x as _).collect::>(); + if counts_mismatch { + error!( + "mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}", + edges.len(), + ts1s.len(), + ts2s.len(), + counts.len(), + avgs.len(), + mins.len(), + maxs.len(), + ); + } + let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect(); + // TODO construct a dyn TimeBinned using the scalar type and shape information. + // TODO place the values with little copying into the TimeBinned. + use ScalarType::*; + use Shape::*; + match &chn.shape { + Scalar => match &chn.scalar_type { + F64 => { + let ret = MinMaxAvgDim0Bins:: { + ts1s, + ts2s, + counts, + avgs, + mins, + maxs, + }; + return Ok(Some(Box::new(ret))); + } + _ => { + error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); + err::todoval() + } + }, + _ => { + error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); + err::todoval() + } + } + } + Ok(None) } #[allow(unused)] @@ -50,136 +131,277 @@ impl<'a> Future for WriteFut<'a> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let _ = cx; - todo!() + Poll::Ready(Ok(())) } } pub fn write_cached_scylla<'a>( + series: u64, chn: &ChannelTyped, - coord: &PreBinnedPatchCoord, + coord: &'a PreBinnedPatchCoord, data: &'a dyn TimeBinned, scy: &ScySession, ) -> Pin> + Send + 'a>> { - let chn = unsafe { &*(chn as *const ChannelTyped) }; + let _chn = unsafe { &*(chn as *const ChannelTyped) }; let data = unsafe { &*(data as *const dyn TimeBinned) }; let scy = unsafe { &*(scy as *const ScySession) }; let fut = async move { - let _ = coord; - let series = chn.series_id()?; - let res = scy - .query_iter("", (series as i64, data.dummy_test_i32())) - .await - .err_conv()?; - let _ = res; - // TODO write the data. - //err::todoval(); + let bin_len_sec = (coord.bin_t_len() / SEC) as i32; + let patch_len_sec = (coord.patch_t_len() / SEC) as i32; + let offset = coord.ix(); + warn!( + "write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}", + series, + bin_len_sec, + patch_len_sec, + offset, + data.counts().len() + ); + let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?; + scy.execute( + &stmt, + ( + series as i64, + bin_len_sec, + patch_len_sec, + offset as i64, + data.counts().iter().map(|x| *x as i64).collect::>(), + data.avgs(), + data.mins(), + data.maxs(), + ), + ) + .await + .err_conv() + .map_err(|e| { + error!("can not write to cache"); + e + })?; Ok(()) }; Box::pin(fut) } -// TODO must indicate to the caller whether it is safe to cache this (complete). pub async fn fetch_uncached_data( + series: u64, chn: ChannelTyped, coord: PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, scy: Arc, -) -> Result>, Error> { - info!("fetch_uncached_data"); - let range = coord.patch_range(); - // TODO why the extra plus one? - let bin = match PreBinnedPatchRange::covering_range(range, coord.bin_count() + 1) { - Ok(Some(range)) => fetch_uncached_higher_res_prebinned(&chn, &range, scy.clone()).await, - Ok(None) => fetch_uncached_binned_events(&chn, &coord.patch_range(), scy.clone()).await, +) -> Result, bool)>, Error> { + info!("fetch_uncached_data {coord:?}"); + // Try to find a higher resolution pre-binned grid which covers the requested patch. + let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) { + Ok(Some(range)) => { + fetch_uncached_higher_res_prebinned(series, &chn, range, agg_kind, cache_usage.clone(), scy.clone()).await + } + Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await, Err(e) => Err(e), }?; - //let data = bin.workaround_clone(); - WriteFut::new(&chn, &coord, bin.as_ref(), &scy).await?; - write_cached_scylla(&chn, &coord, bin.as_ref(), &scy).await?; - Ok(Some(bin)) + if true || complete { + let edges = coord.edges(); + if edges.len() < bin.len() + 1 { + error!( + "attempt to write overfull bin to cache edges {} bin {}", + edges.len(), + bin.len() + ); + return Err(Error::with_msg_no_trace(format!( + "attempt to write overfull bin to cache" + ))); + } else if edges.len() > bin.len() + 1 { + let missing = edges.len() - bin.len() - 1; + error!("attempt to write incomplete bin to cache missing {missing}"); + } + if let CacheUsage::Use | CacheUsage::Recreate = &cache_usage { + WriteFut::new(&chn, &coord, bin.as_ref(), &scy).await?; + write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?; + } + } + Ok(Some((bin, complete))) } pub fn fetch_uncached_data_box( + series: u64, chn: &ChannelTyped, coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, scy: Arc, -) -> Pin>, Error>> + Send>> { - Box::pin(fetch_uncached_data(chn.clone(), coord.clone(), scy)) +) -> Pin, bool)>, Error>> + Send>> { + Box::pin(fetch_uncached_data( + series, + chn.clone(), + coord.clone(), + agg_kind, + cache_usage, + scy, + )) } pub async fn fetch_uncached_higher_res_prebinned( + series: u64, chn: &ChannelTyped, - range: &PreBinnedPatchRange, + range: PreBinnedPatchRange, + agg_kind: AggKind, + cache_usage: CacheUsage, scy: Arc, -) -> Result, Error> { - let mut aggt = None; +) -> Result<(Box, bool), Error> { + // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. + let do_time_weight = true; + // We must produce some result with correct types even if upstream delivers nothing at all. + let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind); + let mut time_binner = bin0.time_binner_new(range.edges(), do_time_weight); + let mut complete = true; let patch_it = PreBinnedPatchIterator::from_range(range.clone()); for patch in patch_it { + // We request data here for a Coord, meaning that we expect to receive multiple bins. + // The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord. let coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); - let mut stream = pre_binned_value_stream_with_scy(chn, &coord, scy.clone()).await?; - while let Some(item) = stream.next().await { - let item = item?; - // TODO here I will need some new API to aggregate (time-bin) trait objects. - // Each TimeBinned must provide some way to do that... - // I also need an Aggregator which does not know before the first item what output type it will produce. - let _ = item; - if aggt.is_none() { - aggt = Some(item.aggregator_new()); - } - let aggt = aggt.as_mut().unwrap(); - aggt.ingest(item.as_time_binnable_dyn()); - } + let (bin, comp) = + pre_binned_value_stream_with_scy(series, chn, &coord, agg_kind.clone(), cache_usage.clone(), scy.clone()) + .await?; + complete = complete & comp; + time_binner.ingest(bin.as_time_binnable_dyn()); } - let mut aggt = aggt.unwrap(); - let res = aggt.result(); - Ok(res) + // Fixed limit to defend against a malformed implementation: + let mut i = 0; + while i < 80000 && time_binner.bins_ready_count() < range.bin_count() as usize { + if false { + trace!( + "extra cycle {} {} {}", + i, + time_binner.bins_ready_count(), + range.bin_count() + ); + } + time_binner.cycle(); + i += 1; + } + if time_binner.bins_ready_count() < range.bin_count() as usize { + return Err(Error::with_msg_no_trace(format!( + "unable to produce all bins for the patch range {} vs {}", + time_binner.bins_ready_count(), + range.bin_count(), + ))); + } + let ready = time_binner + .bins_ready() + .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?; + Ok((ready, complete)) } pub async fn fetch_uncached_binned_events( + series: u64, chn: &ChannelTyped, - range: &NanoRange, + coord: PreBinnedPatchCoord, + agg_kind: AggKind, scy: Arc, -) -> Result, Error> { - // TODO ask Scylla directly, do not go through HTTP. - // Refactor the event fetch stream code such that I can use that easily here. - let evq = RawEventsQuery::new(chn.channel.clone(), range.clone(), AggKind::Plain); - let _res = Box::pin(EventsStreamScylla::new( - &evq, - chn.scalar_type.clone(), - chn.shape.clone(), - scy, - false, - )); - // TODO add the time binner. - // TODO return the result of the binning procedure. - // TODO ScyllaFramableStream must return a new events trait object designed for trait object use. - err::todoval() +) -> Result<(Box, bool), Error> { + // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. + let do_time_weight = true; + // We must produce some result with correct types even if upstream delivers nothing at all. + let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); + let mut time_binner = bin0.time_binner_new(coord.edges(), do_time_weight); + let deadline = Instant::now(); + let deadline = deadline + .checked_add(Duration::from_millis(6000)) + .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; + let evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), AggKind::Plain); + let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); + let mut complete = false; + loop { + let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await; + let item = match item { + Ok(Some(k)) => k, + Ok(None) => break, + Err(_) => { + error!("fetch_uncached_binned_events timeout"); + return Err(Error::with_msg_no_trace(format!( + "TODO handle fetch_uncached_binned_events timeout" + ))); + } + }; + match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + time_binner.ingest(item.as_time_binnable_dyn()); + // TODO could also ask the binner here whether we are "complete" to stop sending useless data. + } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + complete = true; + } + Ok(StreamItem::Stats(_item)) => { + warn!("TODO forward in stream bincache stats"); + } + Ok(StreamItem::Log(item)) => { + warn!("TODO forward in stream bincache log msg {}", item.msg); + } + Err(e) => return Err(e), + } + } + // Fixed limit to defend against a malformed implementation: + let mut i = 0; + while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { + if false { + trace!( + "extra cycle {} {} {}", + i, + time_binner.bins_ready_count(), + coord.bin_count() + ); + } + time_binner.cycle(); + i += 1; + } + if time_binner.bins_ready_count() < coord.bin_count() as usize { + return Err(Error::with_msg_no_trace(format!( + "unable to produce all bins for the patch range {} vs {}", + time_binner.bins_ready_count(), + coord.bin_count(), + ))); + } + let ready = time_binner + .bins_ready() + .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?; + Ok((ready, complete)) } pub async fn pre_binned_value_stream_with_scy( + series: u64, chn: &ChannelTyped, coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, scy: Arc, -) -> Result, Error>> + Send>>, Error> { - info!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); - // TODO determine the range: - let range = err::todoval(); - if let Some(item) = read_cached_scylla(chn, &range, &scy).await? { - Ok(Box::pin(futures_util::stream::iter([Ok(item)]))) +) -> Result<(Box, bool), Error> { + trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); + if let (Some(item), CacheUsage::Use) = ( + read_cached_scylla(series, chn, coord, agg_kind.clone(), &scy).await?, + &cache_usage, + ) { + info!("+++++++++++++ GOOD READ"); + Ok((item, true)) } else { - let bin = fetch_uncached_data_box(chn, coord, scy).await?; - // TODO when can it ever be that we get back a None? - // TODO also, probably the caller wants to know whether the bin is Complete. - let bin = bin.unwrap(); - Ok(Box::pin(futures_util::stream::iter([Ok(bin)]))) + if let CacheUsage::Use = &cache_usage { + warn!("--+--+--+--+--+--+ NOT YET CACHED"); + } + let res = fetch_uncached_data_box(series, chn, coord, agg_kind, cache_usage, scy).await?; + let (bin, complete) = + res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?; + Ok((bin, complete)) } } pub async fn pre_binned_value_stream( + series: u64, chn: &ChannelTyped, coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, scyconf: &ScyllaConfig, ) -> Result, Error>> + Send>>, Error> { - info!("pre_binned_value_stream {chn:?} {coord:?} {scyconf:?}"); + trace!("pre_binned_value_stream series {series} {chn:?} {coord:?} {scyconf:?}"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .use_keyspace(&scyconf.keyspace, true) @@ -187,5 +409,6 @@ pub async fn pre_binned_value_stream( .await .err_conv()?; let scy = Arc::new(scy); - pre_binned_value_stream_with_scy(chn, coord, scy).await + let res = pre_binned_value_stream_with_scy(series, chn, coord, agg_kind, cache_usage, scy).await?; + Ok(Box::pin(futures_util::stream::iter([Ok(res.0)]))) } diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index 136af75..5b7a723 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -3,10 +3,10 @@ use err::Error; use futures_util::{Future, FutureExt, Stream}; use items::scalarevents::ScalarEvents; use items::waveevents::WaveEvents; -use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; +use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape}; +use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape}; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; @@ -111,11 +111,11 @@ enum FrState { pub struct EventsStreamScylla { state: FrState, + series: u64, #[allow(unused)] evq: RawEventsQuery, scalar_type: ScalarType, shape: Shape, - series: u64, range: NanoRange, scy: Arc, do_test_stream_error: bool, @@ -123,6 +123,7 @@ pub struct EventsStreamScylla { impl EventsStreamScylla { pub fn new( + series: u64, evq: &RawEventsQuery, scalar_type: ScalarType, shape: Shape, @@ -131,7 +132,7 @@ impl EventsStreamScylla { ) -> Self { Self { state: FrState::New, - series: evq.channel.series.unwrap(), + series, evq: evq.clone(), scalar_type, shape, @@ -204,19 +205,25 @@ impl Stream for EventsStreamScylla { } } -async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType, Shape), Error> { - info!("find_series series {}", series); - let rows = { - let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; +async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { + info!("find_series channel {:?}", channel); + let rows = if let Some(series) = channel.series() { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; pgclient.query(q, &[&(series as i64)]).await.err_conv()? + } else { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; + pgclient + .query(q, &[&channel.backend(), &channel.name()]) + .await + .err_conv()? }; if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); + return Err(Error::with_public_msg_no_trace(format!( + "No series found for {channel:?}" + ))); } if rows.len() > 1 { - error!("Multiple series found for channel, can not return data for ambiguous series"); + error!("Multiple series found for {channel:?}"); return Err(Error::with_public_msg_no_trace( "Multiple series found for channel, can not return data for ambiguous series", )); @@ -225,15 +232,14 @@ async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType .into_iter() .next() .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; - info!("row {row:?}"); - let _facility: String = row.get(0); - let _channel: String = row.get(1); - let a: i32 = row.get(2); + let series = row.get::<_, i64>(0) as u64; + let _facility: String = row.get(1); + let _channel: String = row.get(2); + let a: i32 = row.get(3); let scalar_type = ScalarType::from_scylla_i32(a)?; - let a: Vec = row.get(3); + let a: Vec = row.get(4); let shape = Shape::from_scylla_shape_dims(&a)?; - info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}"); - Ok((scalar_type, shape)) + Ok((series, scalar_type, shape)) } async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { @@ -359,20 +365,21 @@ pub async fn make_scylla_stream( dbconf: Database, do_test_stream_error: bool, ) -> Result>> + Send>>, Error> { - info!("make_scylla_stream open scylla connection"); // TODO should RawEventsQuery already contain ScalarType and Shape? - let (scalar_type, shape) = { + let (series, scalar_type, shape) = { let u = { let d = &dbconf; format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) }; + info!("--------------- open postgres connection"); let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; // TODO use common connection/pool: tokio::spawn(pgconn); let pgclient = Arc::new(pgclient); - find_series(evq.channel.series.unwrap(), pgclient.clone()).await? + find_series(&evq.channel, pgclient.clone()).await? }; // TODO reuse existing connection: + info!("--------------- open scylla connection"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyco.hosts) .use_keyspace(&scyco.keyspace, true) @@ -381,6 +388,7 @@ pub async fn make_scylla_stream( .err_conv()?; let scy = Arc::new(scy); let res = Box::pin(EventsStreamScylla::new( + series, evq, scalar_type, shape, diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index f49ad0e..124ca09 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -42,13 +42,21 @@ where <::Output as TimeBinnableType>::Output: SitemtyFrameType + TimeBinned, { if let Some(scyconf) = &node_config.node_config.cluster.cache_scylla { - info!("~~~~~~~~~~~~~~~ make_num_pipeline_nty_end_evs_enp using scylla as cache"); + trace!("~~~~~~~~~~~~~~~ make_num_pipeline_nty_end_evs_enp using scylla as cache"); let chn = ChannelTyped { channel: query.channel().clone(), scalar_type, shape, }; - let stream = pre_binned_value_stream(&chn, query.patch(), scyconf).await?; + let stream = pre_binned_value_stream( + chn.channel().series().unwrap(), + &chn, + query.patch(), + agg_kind, + query.cache_usage(), + scyconf, + ) + .await?; let stream = stream.map(|x| { // match x { diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 03881c2..ca9fd6f 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -2,10 +2,7 @@ use err::Error; use http::request::Parts; use netpod::query::{agg_kind_from_binning_scheme, binning_scheme_append_to_url, CacheUsage}; use netpod::timeunits::SEC; -use netpod::{ - channel_append_to_url, channel_from_pairs, AggKind, AppendToUrl, ByteSize, Channel, PreBinnedPatchCoord, - ScalarType, Shape, -}; +use netpod::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, PreBinnedPatchCoord, ScalarType, Shape}; use std::collections::BTreeMap; use url::Url; @@ -81,7 +78,7 @@ impl PreBinnedQuery { .map(|x| Shape::from_url_str(&x))??; let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len * SEC, patch_t_len * SEC, patch_ix), - channel: channel_from_pairs(&pairs)?, + channel: Channel::from_pairs(&pairs)?, scalar_type, shape, agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), @@ -148,7 +145,7 @@ impl AppendToUrl for PreBinnedQuery { fn append_to_url(&self, url: &mut Url) { self.patch.append_to_url(url); binning_scheme_append_to_url(&self.agg_kind, url); - channel_append_to_url(url, &self.channel); + self.channel.append_to_url(url); self.shape.append_to_url(url); self.scalar_type.append_to_url(url); let mut g = url.query_pairs_mut(); diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 646c877..d64cb2a 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -565,6 +565,11 @@ impl Appendable for EventFull { self.shapes.extend_from_slice(&src.shapes); self.comps.extend_from_slice(&src.comps); } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO do we still need this type? + todo!() + } } impl Clearable for EventFull { diff --git a/disk/src/events.rs b/disk/src/events.rs index 2fd493a..3f07fb5 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use netpod::{channel_append_to_url, channel_from_pairs, get_url_query_pairs}; +use netpod::get_url_query_pairs; use netpod::{AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; use std::time::Duration; use url::Url; @@ -40,54 +40,6 @@ impl PlainEventsQuery { } } - pub fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; - let ret = Self { - channel: channel_from_pairs(&pairs)?, - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, - timeout: pairs - .get("timeout") - .map_or("10000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, - events_max: pairs - .get("eventsMax") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - do_log: pairs - .get("doLog") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, - do_test_main_error: pairs - .get("doTestMainError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, - do_test_stream_error: pairs - .get("doTestStreamError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, - }; - Ok(ret) - } - pub fn from_request_head(head: &http::request::Parts) -> Result { let s1 = format!("dummy:{}", head.uri); let url = Url::parse(&s1)?; @@ -130,6 +82,10 @@ impl PlainEventsQuery { self.do_test_stream_error } + pub fn set_series_id(&mut self, series: u64) { + self.channel.series = Some(series); + } + pub fn set_timeout(&mut self, k: Duration) { self.timeout = k; } @@ -141,26 +97,6 @@ impl PlainEventsQuery { pub fn set_do_test_stream_error(&mut self, k: bool) { self.do_test_stream_error = k; } - - pub fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - channel_append_to_url(url, &self.channel); - let mut g = url.query_pairs_mut(); - g.append_pair( - "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), - ); - g.append_pair( - "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), - ); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - if let Some(x) = self.events_max.as_ref() { - g.append_pair("eventsMax", &format!("{}", x)); - } - g.append_pair("doLog", &format!("{}", self.do_log)); - } } impl HasBackend for PlainEventsQuery { @@ -177,12 +113,76 @@ impl HasTimeout for PlainEventsQuery { impl FromUrl for PlainEventsQuery { fn from_url(url: &Url) -> Result { - Self::from_url(url) + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; + let ret = Self { + channel: Channel::from_pairs(&pairs)?, + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, + report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, + timeout: pairs + .get("timeout") + .map_or("10000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, + events_max: pairs + .get("eventsMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + do_log: pairs + .get("doLog") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, + do_test_main_error: pairs + .get("doTestMainError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, + do_test_stream_error: pairs + .get("doTestStreamError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, + }; + Ok(ret) } } impl AppendToUrl for PlainEventsQuery { fn append_to_url(&self, url: &mut Url) { - self.append_to_url(url) + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + self.channel.append_to_url(url); + let mut g = url.query_pairs_mut(); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + if let Some(x) = self.events_max.as_ref() { + g.append_pair("eventsMax", &format!("{}", x)); + } + g.append_pair("doLog", &format!("{}", self.do_log)); } } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9524614..42aff04 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -20,10 +20,19 @@ use std::time::{Duration, Instant}; use url::Url; pub struct ChConf { + pub series: u64, pub scalar_type: ScalarType, pub shape: Shape, } +/// It is an unsolved question as to how we want to uniquely address channels. +/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases +/// are not solved. At the same time, it is desirable to avoid to complicate things for users. +/// Current state: +/// If the series id is given, we take that. +/// Otherwise we try to uniquely identify the series id from the given information. +/// In the future, we can even try to involve time range information for that, but backends like +/// old archivers and sf databuffer do not support such lookup. pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Result { if channel.backend != ncc.node_config.cluster.backend { warn!( @@ -31,11 +40,6 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> channel.backend, ncc.node_config.cluster.backend ); } - // This requires the series id. - let series = channel.series.ok_or_else(|| { - Error::with_msg_no_trace(format!("needs a series id {:?}", channel)) - .add_public_msg(format!("series id of channel not supplied")) - })?; // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; let dburl = format!( @@ -46,28 +50,59 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> .await .err_conv()?; tokio::spawn(pgconn); - let res = pgclient - .query( - "select scalar_type, shape_dims from series_by_channel where series = $1", - &[&(series as i64)], - ) - .await - .err_conv()?; - if res.len() == 0 { - warn!("can not find channel information for series {series}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for series {series}")); - Err(e) - } else if res.len() > 1 { - error!("multiple channel information for series {series}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for series {series}")); - Err(e) + if let Some(series) = channel.series() { + let res = pgclient + .query( + "select scalar_type, shape_dims from series_by_channel where series = $1", + &[&(series as i64)], + ) + .await + .err_conv()?; + if res.len() < 1 { + warn!("can not find channel information for series {series} given through {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); + Err(e) + } else { + let row = res.first().unwrap(); + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(1))?; + let ret = ChConf { + series, + scalar_type, + shape, + }; + Ok(ret) + } } else { - let row = res.first().unwrap(); - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(1))?; - let ret = ChConf { scalar_type, shape }; - Ok(ret) + let res = pgclient + .query( + "select series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2", + &[&channel.backend(), &channel.name()], + ) + .await + .err_conv()?; + if res.len() < 1 { + warn!("can not find channel information for {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); + Err(e) + } else if res.len() > 1 { + warn!("ambigious channel {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); + Err(e) + } else { + let row = res.first().unwrap(); + let series = row.get::<_, i64>(0) as u64; + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; + let ret = ChConf { + series, + scalar_type, + shape, + }; + Ok(ret) + } } } @@ -81,6 +116,10 @@ pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCache pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { let ret = ChConf { + series: q + .channel() + .series() + .expect("PreBinnedQuery is expected to contain the series id"), scalar_type: q.scalar_type().clone(), shape: q.shape().clone(), }; @@ -330,6 +369,10 @@ pub struct ChannelsWithTypeQuery { impl FromUrl for ChannelsWithTypeQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("scalar_type") .ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?; @@ -440,6 +483,10 @@ fn bool_false(x: &bool) -> bool { impl FromUrl for ScyllaChannelEventSeriesIdQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let facility = pairs .get("facility") .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? @@ -624,6 +671,10 @@ pub struct ScyllaChannelsActiveQuery { impl FromUrl for ScyllaChannelsActiveQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("tsedge") .ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?; @@ -731,6 +782,10 @@ pub struct ChannelFromSeriesQuery { impl FromUrl for ChannelFromSeriesQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("seriesId") .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; @@ -856,6 +911,10 @@ pub struct IocForChannelQuery { impl FromUrl for IocForChannelQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let facility = pairs .get("facility") .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? @@ -945,6 +1004,10 @@ pub struct ScyllaSeriesTsMspQuery { impl FromUrl for ScyllaSeriesTsMspQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("seriesId") .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; @@ -1029,3 +1092,74 @@ impl ScyllaSeriesTsMsp { Ok(ret) } } + +#[derive(Serialize)] +pub struct AmbigiousChannel { + series: u64, + name: String, + scalar_type: ScalarType, + shape: Shape, +} + +#[derive(Serialize)] +pub struct AmbigiousChannelNamesResponse { + ambigious: Vec, +} + +pub struct AmbigiousChannelNames {} + +impl AmbigiousChannelNames { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/channels/ambigious" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + match self.process(node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?), + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn process(&self, node_config: &NodeConfigCached) -> Result { + let dbconf = &node_config.node_config.cluster.database; + let pg_client = create_connection(dbconf).await?; + let rows = pg_client + .query( + "select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series", + &[], + ) + .await?; + let mut ret = AmbigiousChannelNamesResponse { ambigious: Vec::new() }; + for row in rows { + let g = AmbigiousChannel { + series: row.get::<_, i64>(0) as u64, + name: row.get(1), + scalar_type: ScalarType::from_scylla_i32(row.get(2))?, + shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?, + }; + ret.ambigious.push(g); + } + Ok(ret) + } +} diff --git a/httpret/src/download.rs b/httpret/src/download.rs index 2d36f88..04e4aab 100644 --- a/httpret/src/download.rs +++ b/httpret/src/download.rs @@ -15,6 +15,10 @@ pub struct DownloadQuery { impl FromUrl for DownloadQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { let read_sys = pairs .get("ReadSys") .map(|x| x as &str) diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 197e9d2..94285c4 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -6,7 +6,7 @@ use futures_util::{StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; -use netpod::{AggKind, NodeConfigCached}; +use netpod::{AggKind, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use url::Url; @@ -54,6 +54,13 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) let url = Url::parse(&format!("dummy:{}", req.uri()))?; let query = PlainEventsQuery::from_url(&url)?; let chconf = chconf_from_events_binary(&query, node_config).await?; + + // Update the series id since we don't require some unique identifier yet. + let mut query = query; + query.set_series_id(chconf.series); + let query = query; + // --- + let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); let s = disk::channelexec::channel_exec( op, @@ -78,6 +85,13 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - let (head, _body) = req.into_parts(); let query = PlainEventsQuery::from_request_head(&head)?; let chconf = chconf_from_events_json(&query, node_config).await?; + + // Update the series id since we don't require some unique identifier yet. + let mut query = query; + query.set_series_id(chconf.series); + let query = query; + // --- + let op = disk::channelexec::PlainEventsJson::new( // TODO pass only the query, not channel, range again: query.clone(), diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index aed0359..5e2d142 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -29,6 +29,7 @@ use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::AggKind; use netpod::Channel; +use netpod::FromUrl; use netpod::NanoRange; use netpod::NodeConfigCached; use netpod::PerfOpts; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index ca9417e..16d4ff4 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -29,7 +29,7 @@ use net::SocketAddr; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; -use netpod::{channel_from_pairs, get_url_query_pairs}; +use netpod::{get_url_query_pairs, Channel}; use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance}; use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; @@ -235,6 +235,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { @@ -424,6 +426,11 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res e.add_public_msg(msg) })?; let chconf = chconf_from_binned(&query, node_config).await?; + // Update the series id since we don't require some unique identifier yet. + let mut query = query; + query.set_series_id(chconf.series); + let query = query; + // --- let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC); let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str()); span1.in_scope(|| { @@ -874,7 +881,7 @@ pub async fn archapp_scan_files_insert( pub async fn archapp_channel_info(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); - let channel = channel_from_pairs(&pairs)?; + let channel = Channel::from_pairs(&pairs)?; match archapp_wrap::channel_info(&channel, node_config).await { Ok(res) => { let buf = serde_json::to_vec(&res)?; diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 8c2ac23..4a52336 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -465,6 +465,12 @@ impl FromUrl for MapPulseQuery { let ret = Self { backend, pulse }; Ok(ret) } + + fn from_pairs(_pairs: &BTreeMap) -> Result { + Err(err::Error::with_msg_no_trace(format!( + "can not only construct from pairs" + ))) + } } impl AppendToUrl for MapPulseQuery { diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index 7694e9d..a3cb61e 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -60,6 +60,11 @@ impl Appendable for SingleBinWaveEvents { } }) } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for SingleBinWaveEvents { @@ -150,6 +155,11 @@ impl Appendable for MultiBinWaveEvents { } }) } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for MultiBinWaveEvents { @@ -249,6 +259,11 @@ impl Appendable for XBinnedEvents { }, } } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for XBinnedEvents { diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index a4b4aa7..82908f7 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -1,17 +1,19 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::{ - ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, IsoDateTime, - RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableDyn, - TimeBinnableDynAggregator, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBins, WithLen, + ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, IsoDateTime, NewEmpty, + RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableDyn, TimeBinnableType, + TimeBinnableTypeAggregator, TimeBinned, TimeBinnerDyn, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use num_traits::Zero; use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; use std::fmt; use std::marker::PhantomData; use tokio::fs::File; @@ -21,10 +23,9 @@ pub struct MinMaxAvgDim0Bins { pub ts1s: Vec, pub ts2s: Vec, pub counts: Vec, - // TODO get rid of Option: - pub mins: Vec>, - pub maxs: Vec>, - pub avgs: Vec>, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, } impl FrameTypeStatic for MinMaxAvgDim0Bins @@ -155,6 +156,19 @@ impl WithLen for MinMaxAvgDim0Bins { } } +impl NewEmpty for MinMaxAvgDim0Bins { + fn empty(_shape: Shape) -> Self { + Self { + ts1s: Vec::new(), + ts2s: Vec::new(), + counts: Vec::new(), + mins: Vec::new(), + maxs: Vec::new(), + avgs: Vec::new(), + } + } +} + impl Appendable for MinMaxAvgDim0Bins where NTY: NumOps, @@ -171,6 +185,15 @@ where self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } + + fn append_zero(&mut self, ts1: u64, ts2: u64) { + self.ts1s.push(ts1); + self.ts2s.push(ts2); + self.counts.push(0); + self.mins.push(NTY::zero()); + self.maxs.push(NTY::zero()); + self.avgs.push(0.); + } } impl ReadableFromFile for MinMaxAvgDim0Bins @@ -193,7 +216,7 @@ where NTY: NumOps, { type Output = MinMaxAvgDim0Bins; - type Aggregator = MinMaxAvgBinsAggregator; + type Aggregator = MinMaxAvgDim0BinsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { debug!( @@ -233,11 +256,10 @@ pub struct MinMaxAvgBinsCollectedResult { ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, - //ts_bin_edges: Vec, counts: Vec, - mins: Vec>, - maxs: Vec>, - avgs: Vec>, + mins: Vec, + maxs: Vec, + avgs: Vec, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] @@ -340,29 +362,29 @@ where } } -pub struct MinMaxAvgBinsAggregator { +pub struct MinMaxAvgDim0BinsAggregator { range: NanoRange, count: u64, - min: Option, - max: Option, + min: NTY, + max: NTY, sumc: u64, sum: f32, } -impl MinMaxAvgBinsAggregator { +impl MinMaxAvgDim0BinsAggregator { pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { Self { range, count: 0, - min: None, - max: None, + min: NTY::zero(), + max: NTY::zero(), sumc: 0, sum: 0f32, } } } -impl TimeBinnableTypeAggregator for MinMaxAvgBinsAggregator +impl TimeBinnableTypeAggregator for MinMaxAvgDim0BinsAggregator where NTY: NumOps, { @@ -375,55 +397,33 @@ where fn ingest(&mut self, item: &Self::Input) { for i1 in 0..item.ts1s.len() { - if item.ts2s[i1] <= self.range.beg { + if item.counts[i1] == 0 { + } else if item.ts2s[i1] <= self.range.beg { } else if item.ts1s[i1] >= self.range.end { } else { - self.min = match &self.min { - None => item.mins[i1].clone(), - Some(min) => match &item.mins[i1] { - None => Some(min.clone()), - Some(v) => { - if v < &min { - Some(v.clone()) - } else { - Some(min.clone()) - } - } - }, - }; - self.max = match &self.max { - None => item.maxs[i1].clone(), - Some(max) => match &item.maxs[i1] { - None => Some(max.clone()), - Some(v) => { - if v > &max { - Some(v.clone()) - } else { - Some(max.clone()) - } - } - }, - }; - match item.avgs[i1] { - None => {} - Some(v) => { - if v.is_nan() { - } else { - self.sum += v; - self.sumc += 1; - } + if self.count == 0 { + self.min = item.mins[i1].clone(); + self.max = item.maxs[i1].clone(); + } else { + if item.mins[i1] < self.min { + self.min = item.mins[i1].clone(); + } + if item.maxs[i1] > self.max { + self.max = item.maxs[i1].clone(); } } self.count += item.counts[i1]; + self.sum += item.avgs[i1]; + self.sumc += 1; } } } fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let avg = if self.sumc == 0 { - None + 0f32 } else { - Some(self.sum / self.sumc as f32) + self.sum / self.sumc as f32 }; let ret = Self::Output { ts1s: vec![self.range.beg], @@ -434,8 +434,8 @@ where avgs: vec![avg], }; self.count = 0; - self.min = None; - self.max = None; + self.min = NTY::zero(); + self.max = NTY::zero(); self.range = range; self.sum = 0f32; self.sumc = 0; @@ -443,9 +443,171 @@ where } } -impl TimeBinnableDyn for MinMaxAvgDim0Bins { - fn aggregator_new(&self) -> Box { - todo!() +impl TimeBinnableDyn for MinMaxAvgDim0Bins { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + eprintln!("MinMaxAvgDim0Bins time_binner_new"); + info!("MinMaxAvgDim0Bins time_binner_new"); + let ret = MinMaxAvgDim0BinsTimeBinner::::new(edges.into(), do_time_weight); + Box::new(ret) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +pub struct MinMaxAvgDim0BinsTimeBinner { + edges: VecDeque, + do_time_weight: bool, + range: NanoRange, + agg: Option>, + ready: Option< as TimeBinnableTypeAggregator>::Output>, +} + +impl MinMaxAvgDim0BinsTimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Self { + let range = if edges.len() >= 2 { + NanoRange { + beg: edges[0], + end: edges[1], + } + } else { + // Using a dummy for this case. + NanoRange { beg: 1, end: 2 } + }; + Self { + edges, + do_time_weight, + range, + agg: None, + ready: None, + } + } + + // Move the bin from the current aggregator (if any) to our output collection, + // and step forward in our bin list. + fn cycle(&mut self) { + eprintln!("cycle"); + // TODO where to take expand from? Is it still required after all? + let expand = true; + let have_next_bin = self.edges.len() >= 3; + let range_next = if have_next_bin { + NanoRange { + beg: self.edges[1], + end: self.edges[2], + } + } else { + // Using a dummy for this case. + NanoRange { beg: 1, end: 2 } + }; + if let Some(agg) = self.agg.as_mut() { + eprintln!("cycle: use existing agg: {:?}", agg.range); + let mut h = agg.result_reset(range_next.clone(), expand); + match self.ready.as_mut() { + Some(fin) => { + fin.append(&mut h); + } + None => { + self.ready = Some(h); + } + } + } else if have_next_bin { + eprintln!("cycle: append a zero bin"); + let mut h = MinMaxAvgDim0Bins::::empty(); + h.append_zero(self.range.beg, self.range.end); + match self.ready.as_mut() { + Some(fin) => { + fin.append(&mut h); + } + None => { + self.ready = Some(h); + } + } + } else { + eprintln!("cycle: no more next bin"); + } + self.range = range_next; + self.edges.pop_front(); + if !have_next_bin { + self.agg = None; + } + } +} + +impl TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner { + fn cycle(&mut self) { + Self::cycle(self) + } + + fn ingest(&mut self, item: &dyn TimeBinnableDyn) { + const SELF: &str = "MinMaxAvgDim0BinsTimeBinner"; + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(self.range.clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges B"); + return; + } + } + if item.ends_before(self.range.clone()) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} edge list exhausted"); + return; + } else { + if self.agg.is_none() { + self.agg = Some(MinMaxAvgDim0BinsAggregator::new( + self.range.clone(), + self.do_time_weight, + )); + } + let agg = self.agg.as_mut().unwrap(); + if let Some(item) = + item.as_any() + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + agg.ingest(item); + } else { + let tyid_item = std::any::Any::type_id(item.as_any()); + error!("not correct item type {:?}", tyid_item); + }; + if item.ends_after(self.range.clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges C"); + return; + } + } else { + break; + } + } + } + } + } + + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } } } @@ -454,13 +616,23 @@ impl TimeBinned for MinMaxAvgDim0Bins { self as &dyn TimeBinnableDyn } - fn workaround_clone(&self) -> Box { - // TODO remove - panic!() + fn edges_slice(&self) -> (&[u64], &[u64]) { + (&self.ts1s[..], &self.ts2s[..]) } - fn dummy_test_i32(&self) -> i32 { - // TODO remove - panic!() + fn counts(&self) -> &[u64] { + &self.counts[..] + } + + fn mins(&self) -> Vec { + self.mins.iter().map(|x| x.clone().as_prim_f32()).collect() + } + + fn maxs(&self) -> Vec { + self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect() + } + + fn avgs(&self) -> Vec { + self.avgs.clone() } } diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index cbeb92f..7528095 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -3,14 +3,14 @@ use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::waveevents::WaveEvents; use crate::{ pulse_offs_from_abs, ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, - IsoDateTime, RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableDyn, - TimeBinnableDynAggregator, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBins, WithLen, + IsoDateTime, NewEmpty, RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, + TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use num_traits::Zero; use serde::{Deserialize, Serialize}; use std::fmt; @@ -155,6 +155,19 @@ impl WithLen for MinMaxAvgDim1Bins { } } +impl NewEmpty for MinMaxAvgDim1Bins { + fn empty(_shape: Shape) -> Self { + Self { + ts1s: Vec::new(), + ts2s: Vec::new(), + counts: Vec::new(), + mins: Vec::new(), + maxs: Vec::new(), + avgs: Vec::new(), + } + } +} + impl Appendable for MinMaxAvgDim1Bins where NTY: NumOps, @@ -171,6 +184,15 @@ where self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } + + fn append_zero(&mut self, ts1: u64, ts2: u64) { + self.ts1s.push(ts1); + self.ts2s.push(ts2); + self.counts.push(0); + self.avgs.push(None); + self.mins.push(None); + self.maxs.push(None); + } } impl ReadableFromFile for MinMaxAvgDim1Bins @@ -546,24 +568,30 @@ where } } -impl TimeBinnableDyn for MinMaxAvgDim1Bins { - fn aggregator_new(&self) -> Box { - todo!() - } -} +impl crate::TimeBinnableDynStub for MinMaxAvgDim1Bins {} impl TimeBinned for MinMaxAvgDim1Bins { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { self as &dyn TimeBinnableDyn } - fn workaround_clone(&self) -> Box { - // TODO remove - panic!() + fn edges_slice(&self) -> (&[u64], &[u64]) { + (&self.ts1s[..], &self.ts2s[..]) } - fn dummy_test_i32(&self) -> i32 { - // TODO remove - panic!() + fn counts(&self) -> &[u64] { + &self.counts[..] + } + + fn avgs(&self) -> Vec { + err::todoval() + } + + fn mins(&self) -> Vec { + err::todoval() + } + + fn maxs(&self) -> Vec { + err::todoval() } } diff --git a/items/src/eventsitem.rs b/items/src/eventsitem.rs index d217d47..c50b95d 100644 --- a/items/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -94,6 +94,12 @@ impl Appendable for EventsItem { }, } } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + // TODO can we remove EventsItem? + err::todo(); + } } impl PushableIndex for EventsItem { diff --git a/items/src/frame.rs b/items/src/frame.rs index b00c3f4..a21a021 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -13,15 +13,10 @@ pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, { - //trace!("make_frame"); if item.is_err() { make_error_frame(item.err().unwrap()) } else { - make_frame_2( - item, - //FT::FRAME_TYPE_ID - item.frame_type_id(), - ) + make_frame_2(item, item.frame_type_id()) } } @@ -29,12 +24,13 @@ pub fn make_frame_2(item: &FT, fty: u32) -> Result where FT: erased_serde::Serialize, { - //trace!("make_frame_2"); + trace!("make_frame_2 fty {:x}", fty); let mut out = vec![]; + use bincode::Options; let opts = bincode::DefaultOptions::new() - //.with_fixint_encoding() - //.allow_trailing_bytes() - ; + .with_little_endian() + .with_fixint_encoding() + .allow_trailing_bytes(); let mut ser = bincode::Serializer::new(&mut out, opts); //let mut ser = serde_json::Serializer::new(std::io::stdout()); let mut ser2 = ::erase(&mut ser); @@ -136,20 +132,41 @@ where if frame.tyid() == ERROR_FRAME_TYPE_ID { let k: ::err::Error = match bincode::deserialize(frame.buf()) { Ok(item) => item, - Err(e) => Err(e)?, + Err(e) => { + error!( + "ERROR bincode::deserialize len {} ERROR_FRAME_TYPE_ID", + frame.buf().len() + ); + let n = frame.buf().len().min(64); + let s = String::from_utf8_lossy(&frame.buf()[..n]); + error!("frame.buf as string: {:?}", s); + Err(e)? + } }; Ok(T::from_error(k)) } else { let tyid = T::FRAME_TYPE_ID; if frame.tyid() != tyid { return Err(Error::with_msg(format!( - "type id mismatch expect {:x} found {:?}", - tyid, frame + "type id mismatch expect {:x} found {:x} {:?}", + tyid, + frame.tyid(), + frame ))); } match bincode::deserialize(frame.buf()) { Ok(item) => Ok(item), - Err(e) => Err(e)?, + Err(e) => { + error!( + "ERROR bincode::deserialize len {} tyid {:x}", + frame.buf().len(), + frame.tyid() + ); + let n = frame.buf().len().min(64); + let s = String::from_utf8_lossy(&frame.buf()[..n]); + error!("frame.buf as string: {:?}", s); + Err(e)? + } } } } diff --git a/items/src/items.rs b/items/src/items.rs index 5b7eaf3..7bec7f1 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -18,15 +18,15 @@ use crate::numops::BoolNum; use bytes::BytesMut; use chrono::{TimeZone, Utc}; use err::Error; -use frame::make_error_frame; #[allow(unused)] use netpod::log::*; use netpod::timeunits::{MS, SEC}; use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape}; -use netpod::{DiskStats, RangeFilterStats}; +use netpod::{DiskStats, RangeFilterStats, ScalarType}; use numops::StringNum; use serde::de::{self, DeserializeOwned, Visitor}; use serde::{Deserialize, Serialize, Serializer}; +use std::any::Any; use std::fmt; use std::future::Future; use std::marker::PhantomData; @@ -39,15 +39,16 @@ pub const TERM_FRAME_TYPE_ID: u32 = 0x01; pub const ERROR_FRAME_TYPE_ID: u32 = 0x02; pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; pub const EVENT_VALUES_FRAME_TYPE_ID: u32 = 0x500; -pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x800; -pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; -pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x900; -pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; -pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0xb00; +pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800; +pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; +pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00; +pub const NON_DATA_FRAME_TYPE_ID: u32 = 0xc00; pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; +pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; +pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; pub fn bool_is_false(j: &bool) -> bool { *j == false @@ -177,7 +178,7 @@ impl SubFrId for u32 { } impl SubFrId for u64 { - const SUB: u32 = 10; + const SUB: u32 = 0xa; } impl SubFrId for i8 { @@ -197,19 +198,19 @@ impl SubFrId for i64 { } impl SubFrId for f32 { - const SUB: u32 = 11; + const SUB: u32 = 0xb; } impl SubFrId for f64 { - const SUB: u32 = 12; + const SUB: u32 = 0xc; } impl SubFrId for StringNum { - const SUB: u32 = 13; + const SUB: u32 = 0xd; } impl SubFrId for BoolNum { - const SUB: u32 = 14; + const SUB: u32 = 0xe; } // To be implemented by the data containers, i.e. the T's in Sitemty, e.g. ScalarEvents. @@ -243,9 +244,8 @@ impl FrameTypeStatic for EventQueryJsonStringFrame { impl FrameTypeStatic for Sitemty { const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID; - fn from_error(_: err::Error) -> Self { - // TODO remove this method. - panic!() + fn from_error(e: err::Error) -> Self { + Err(e) } } @@ -310,46 +310,45 @@ impl SitemtyFrameType for Box { } } +impl SitemtyFrameType for Box { + fn frame_type_id(&self) -> u32 { + self.as_time_binnable_dyn().frame_type_id() + } +} + // TODO do we need Send here? pub trait Framable { fn make_frame(&self) -> Result; } -// erased_serde::Serialize -pub trait FramableInner: SitemtyFrameType + Send { +pub trait FramableInner: erased_serde::Serialize + SitemtyFrameType + Send { fn _dummy(&self); } -// erased_serde::Serialize` -impl FramableInner for T { +impl FramableInner for T { fn _dummy(&self) {} } -//impl FramableInner for Box {} +erased_serde::serialize_trait_object!(EventsDyn); +erased_serde::serialize_trait_object!(TimeBinnableDyn); +erased_serde::serialize_trait_object!(TimeBinned); -// TODO need also Framable for those types defined in other crates. -// TODO not all T have FrameTypeStatic, e.g. Box impl Framable for Sitemty -//where -//Self: erased_serde::Serialize, -//T: FramableInner + FrameTypeStatic, -//T: Sized, +where + T: Sized + serde::Serialize + SitemtyFrameType, { fn make_frame(&self) -> Result { - todo!() - } - - /*fn make_frame(&self) -> Result { - //trace!("make_frame"); match self { - Ok(_) => make_frame_2( - self, - //T::FRAME_TYPE_ID - self.frame_type_id(), - ), - Err(e) => make_error_frame(e), + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { + let frame_type_id = k.frame_type_id(); + make_frame_2(self, frame_type_id) + } + _ => { + let frame_type_id = NON_DATA_FRAME_TYPE_ID; + make_frame_2(self, frame_type_id) + } } - }*/ + } } impl Framable for Box @@ -421,6 +420,7 @@ pub trait ByteEstimate { } pub trait RangeOverlapInfo { + // TODO do not take by value. fn ends_before(&self, range: NanoRange) -> bool; fn ends_after(&self, range: NanoRange) -> bool; fn starts_after(&self, range: NanoRange) -> bool; @@ -439,9 +439,16 @@ pub trait PushableIndex { fn push_index(&mut self, src: &Self, ix: usize); } +pub trait NewEmpty { + fn empty(shape: Shape) -> Self; +} + pub trait Appendable: WithLen { fn empty_like_self(&self) -> Self; fn append(&mut self, src: &Self); + + // TODO the `ts2` makes no sense for non-bin-implementors + fn append_zero(&mut self, ts1: u64, ts2: u64); } pub trait Clearable { @@ -462,7 +469,15 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { } pub trait TimeBinnableType: - Send + Unpin + RangeOverlapInfo + FilterFittingInside + Appendable + Serialize + ReadableFromFile + FrameTypeStatic + Send + + Unpin + + RangeOverlapInfo + + FilterFittingInside + + NewEmpty + + Appendable + + Serialize + + ReadableFromFile + + FrameTypeStatic { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; @@ -474,33 +489,81 @@ pub trait TimeBinnableType: // TODO should not require Sync! // TODO SitemtyFrameType is already supertrait of FramableInner. -pub trait TimeBinnableDyn: FramableInner + SitemtyFrameType + Sync + Send { - fn aggregator_new(&self) -> Box; +pub trait TimeBinnableDyn: + std::fmt::Debug + FramableInner + SitemtyFrameType + WithLen + RangeOverlapInfo + Any + Sync + Send + 'static +{ + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; + fn as_any(&self) -> &dyn Any; } +pub trait TimeBinnableDynStub: + std::fmt::Debug + FramableInner + SitemtyFrameType + WithLen + RangeOverlapInfo + Any + Sync + Send + 'static +{ +} + +// impl for the stubs TODO: remove +impl TimeBinnableDyn for T +where + T: TimeBinnableDynStub, +{ + fn time_binner_new(&self, _edges: Vec, _do_time_weight: bool) -> Box { + error!("TODO impl time_binner_new for T {}", std::any::type_name::()); + err::todoval() + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +// TODO maybe this is no longer needed: pub trait TimeBinnableDynAggregator: Send { fn ingest(&mut self, item: &dyn TimeBinnableDyn); fn result(&mut self) -> Box; } /// Container of some form of events, for use as trait object. -pub trait EventsDyn: TimeBinnableDyn {} +pub trait EventsDyn: TimeBinnableDyn { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; +} /// Data in time-binned form. pub trait TimeBinned: TimeBinnableDyn { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; - fn workaround_clone(&self) -> Box; - fn dummy_test_i32(&self) -> i32; + fn edges_slice(&self) -> (&[u64], &[u64]); + fn counts(&self) -> &[u64]; + fn mins(&self) -> Vec; + fn maxs(&self) -> Vec; + fn avgs(&self) -> Vec; } -// TODO this impl is already covered by the generic one: -/*impl FramableInner for Box { - fn _dummy(&self) {} -}*/ +impl WithLen for Box { + fn len(&self) -> usize { + self.as_time_binnable_dyn().len() + } +} + +impl RangeOverlapInfo for Box { + fn ends_before(&self, range: NanoRange) -> bool { + self.as_time_binnable_dyn().ends_before(range) + } + + fn ends_after(&self, range: NanoRange) -> bool { + self.as_time_binnable_dyn().ends_after(range) + } + + fn starts_after(&self, range: NanoRange) -> bool { + self.as_time_binnable_dyn().starts_after(range) + } +} impl TimeBinnableDyn for Box { - fn aggregator_new(&self) -> Box { - self.as_time_binnable_dyn().aggregator_new() + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + self.as_time_binnable_dyn().time_binner_new(edges, do_time_weight) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any } } @@ -621,3 +684,119 @@ pub fn inspect_timestamps(events: &dyn TimestampInspectable, range: NanoRange) - } buf } + +pub trait TimeBinnerDyn: Send { + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option>; + fn ingest(&mut self, item: &dyn TimeBinnableDyn); + + /// Caller indicates that there will be no more data for the current bin. + /// Implementor is expected to prepare processing the next bin. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); +} + +pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { + match shape { + Shape::Scalar => match agg_kind { + AggKind::TimeWeightedScalar => { + use ScalarType::*; + type K = scalarevents::ScalarEvents; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => err::todoval(), + } + } + _ => err::todoval(), + }, + Shape::Wave(_n) => match agg_kind { + AggKind::DimXBins1 => { + use ScalarType::*; + type K = waveevents::WaveEvents; + match scalar_type { + U8 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => err::todoval(), + } + } + _ => err::todoval(), + }, + Shape::Image(..) => err::todoval(), + } +} + +pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { + match shape { + Shape::Scalar => match agg_kind { + AggKind::TimeWeightedScalar => { + use ScalarType::*; + type K = binsdim0::MinMaxAvgDim0Bins; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => err::todoval(), + } + } + _ => err::todoval(), + }, + Shape::Wave(_n) => match agg_kind { + AggKind::DimXBins1 => { + use ScalarType::*; + type K = binsdim0::MinMaxAvgDim0Bins; + match scalar_type { + U8 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => err::todoval(), + } + } + _ => err::todoval(), + }, + Shape::Image(..) => err::todoval(), + } +} + +#[test] +fn bin_binned_01() { + use binsdim0::MinMaxAvgDim0Bins; + let edges = vec![SEC * 1000, SEC * 1010, SEC * 1020]; + let inp0 = as NewEmpty>::empty(Shape::Scalar); + let mut time_binner = inp0.time_binner_new(edges, true); + let inp1 = MinMaxAvgDim0Bins:: { + ts1s: vec![SEC * 1000, SEC * 1010], + ts2s: vec![SEC * 1010, SEC * 1020], + counts: vec![1, 1], + mins: vec![3, 4], + maxs: vec![10, 9], + avgs: vec![7., 6.], + }; + assert_eq!(time_binner.bins_ready_count(), 0); + time_binner.ingest(&inp1); + assert_eq!(time_binner.bins_ready_count(), 1); + time_binner.cycle(); + assert_eq!(time_binner.bins_ready_count(), 2); + time_binner.cycle(); + //assert_eq!(time_binner.bins_ready_count(), 2); + let bins = time_binner.bins_ready().expect("bins should be ready"); + eprintln!("bins: {:?}", bins); + assert_eq!(bins.counts().len(), 2); + assert_eq!(time_binner.bins_ready_count(), 0); +} diff --git a/items/src/numops.rs b/items/src/numops.rs index 4a38093..055f49e 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -115,6 +115,7 @@ pub trait NumOps: + AsPrimF32 + Send + Sync + + 'static + Unpin + Debug + Zero diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index 169eec6..2a33d73 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -50,6 +50,11 @@ impl Appendable for ScalarPlainEvents { } }) } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for ScalarPlainEvents { @@ -157,6 +162,11 @@ impl Appendable for WavePlainEvents { _ => panic!(), } }) } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for WavePlainEvents { @@ -253,6 +263,11 @@ impl Appendable for PlainEvents { }, } } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO can this implement Appendable in a sane way? Do we need it? + err::todo(); + } } impl PushableIndex for PlainEvents { diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index ad03081..29744f0 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -3,13 +3,16 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, - FilterFittingInside, Fits, FitsInside, FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, - SitemtyFrameType, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + FilterFittingInside, Fits, FitsInside, FrameTypeStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, + ReadableFromFile, SitemtyFrameType, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinnerDyn, + WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; use std::fmt; use tokio::fs::File; @@ -195,6 +198,16 @@ where } } +impl NewEmpty for ScalarEvents { + fn empty(_shape: Shape) -> Self { + Self { + tss: Vec::new(), + pulses: Vec::new(), + values: Vec::new(), + } + } +} + impl Appendable for ScalarEvents where NTY: NumOps, @@ -206,6 +219,12 @@ where fn append(&mut self, src: &Self) { self.extend_from_slice(src); } + + fn append_zero(&mut self, ts1: u64, _ts2: u64) { + self.tss.push(ts1); + self.pulses.push(0); + self.values.push(NTY::zero()); + } } impl Clearable for ScalarEvents { @@ -335,14 +354,26 @@ where pub struct EventValuesAggregator { range: NanoRange, count: u64, - min: Option, - max: Option, + min: NTY, + max: NTY, sumc: u64, sum: f32, int_ts: u64, last_ts: u64, last_val: Option, do_time_weight: bool, + events_taken_count: u64, + events_ignored_count: u64, +} + +impl Drop for EventValuesAggregator { + fn drop(&mut self) { + // TODO collect as stats for the request context: + warn!( + "taken {} ignored {}", + self.events_taken_count, self.events_ignored_count + ); + } } impl EventValuesAggregator @@ -354,39 +385,32 @@ where Self { range, count: 0, - min: None, - max: None, + min: NTY::zero(), + max: NTY::zero(), sum: 0f32, sumc: 0, int_ts, last_ts: 0, last_val: None, do_time_weight, + events_taken_count: 0, + events_ignored_count: 0, } } // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max(&mut self, val: NTY) { - self.min = match &self.min { - None => Some(val.clone()), - Some(min) => { - if &val < min { - Some(val.clone()) - } else { - Some(min.clone()) - } + if self.count == 0 { + self.min = val.clone(); + self.max = val; + } else { + if val < self.min { + self.min = val.clone(); } - }; - self.max = match &self.max { - None => Some(val), - Some(max) => { - if &val > max { - Some(val) - } else { - Some(max.clone()) - } + if val > self.max { + self.max = val; } - }; + } } fn apply_event_unweight(&mut self, val: NTY) { @@ -428,10 +452,14 @@ where let ts = item.tss[i1]; let val = item.values[i1].clone(); if ts < self.range.beg { + self.events_ignored_count += 1; } else if ts >= self.range.end { + self.events_ignored_count += 1; + return; } else { self.apply_event_unweight(val); self.count += 1; + self.events_taken_count += 1; } } } @@ -441,11 +469,11 @@ where let ts = item.tss[i1]; let val = item.values[i1].clone(); if ts < self.int_ts { - debug!("just set int_ts"); + self.events_ignored_count += 1; self.last_ts = ts; self.last_val = Some(val); } else if ts >= self.range.end { - debug!("after range"); + self.events_ignored_count += 1; return; } else { debug!("regular"); @@ -453,15 +481,16 @@ where self.count += 1; self.last_ts = ts; self.last_val = Some(val); + self.events_taken_count += 1; } } } fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim0Bins { let avg = if self.sumc == 0 { - None + 0f32 } else { - Some(self.sum / self.sumc as f32) + self.sum / self.sumc as f32 }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], @@ -474,8 +503,8 @@ where self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = None; - self.max = None; + self.min = NTY::zero(); + self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret @@ -491,7 +520,7 @@ where } let avg = { let sc = self.range.delta() as f32 * 1e-9; - Some(self.sum / sc) + self.sum / sc }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], @@ -504,8 +533,8 @@ where self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = None; - self.max = None; + self.min = NTY::zero(); + self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret @@ -555,10 +584,162 @@ where } } -impl TimeBinnableDyn for ScalarEvents { - fn aggregator_new(&self) -> Box { - todo!() +impl TimeBinnableDyn for ScalarEvents { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + eprintln!("ScalarEvents time_binner_new"); + info!("ScalarEvents time_binner_new"); + let ret = ScalarEventsTimeBinner::::new(edges.into(), do_time_weight); + Box::new(ret) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any } } -impl EventsDyn for ScalarEvents {} +impl EventsDyn for ScalarEvents { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { + self as &dyn TimeBinnableDyn + } +} + +pub struct ScalarEventsTimeBinner { + edges: VecDeque, + do_time_weight: bool, + range: NanoRange, + agg: Option>, + ready: Option< as TimeBinnableTypeAggregator>::Output>, +} + +impl ScalarEventsTimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Self { + let range = if edges.len() >= 2 { + NanoRange { + beg: edges[0], + end: edges[1], + } + } else { + // Using a dummy for this case. + NanoRange { beg: 1, end: 2 } + }; + Self { + edges, + do_time_weight, + range, + agg: None, + ready: None, + } + } + + // Move the bin from the current aggregator (if any) to our output collection, + // and step forward in our bin list. + fn cycle(&mut self) { + // TODO expand should be derived from AggKind. Is it still required after all? + let expand = true; + if let Some(agg) = self.agg.as_mut() { + let mut h = agg.result_reset(self.range.clone(), expand); + match self.ready.as_mut() { + Some(fin) => { + fin.append(&mut h); + } + None => { + self.ready = Some(h); + } + } + } else { + let mut h = MinMaxAvgDim0Bins::::empty(); + h.append_zero(self.range.beg, self.range.end); + match self.ready.as_mut() { + Some(fin) => { + fin.append(&mut h); + } + None => { + self.ready = Some(h); + } + } + } + self.edges.pop_front(); + if self.edges.len() >= 2 { + self.range = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + } else { + // Using a dummy for this case. + self.range = NanoRange { beg: 1, end: 2 }; + } + } +} + +impl TimeBinnerDyn for ScalarEventsTimeBinner { + fn cycle(&mut self) { + Self::cycle(self) + } + + fn ingest(&mut self, item: &dyn TimeBinnableDyn) { + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(self.range.clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges B"); + return; + } + } + if item.ends_before(self.range.clone()) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for ScalarEventsTimeBinner edge list exhausted"); + return; + } else { + if self.agg.is_none() { + self.agg = Some(EventValuesAggregator::new(self.range.clone(), self.do_time_weight)); + } + let agg = self.agg.as_mut().unwrap(); + if let Some(item) = item + .as_any() + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + // TODO collect statistics associated with this request: + agg.ingest(item); + } else { + error!("not correct item type"); + }; + if item.ends_after(self.range.clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for ScalarEventsTimeBinner no more bin in edges C"); + return; + } + } else { + break; + } + } + } + } + } + + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } +} diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index 5b91660..1ad727d 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -1,12 +1,12 @@ use crate::streams::{Collectable, Collector}; use crate::{ ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, - FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, + FrameTypeStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, + TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; use std::fmt; use tokio::fs::File; @@ -141,6 +141,15 @@ impl PushableIndex for StatsEvents { } } +impl NewEmpty for StatsEvents { + fn empty(_shape: Shape) -> Self { + Self { + tss: Vec::new(), + pulses: Vec::new(), + } + } +} + impl Appendable for StatsEvents { fn empty_like_self(&self) -> Self { Self::empty() @@ -150,6 +159,11 @@ impl Appendable for StatsEvents { self.tss.extend_from_slice(&src.tss); self.pulses.extend_from_slice(&src.pulses); } + + fn append_zero(&mut self, ts1: u64, _ts2: u64) { + self.tss.push(ts1); + self.pulses.push(0); + } } impl Clearable for StatsEvents { diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 3cb91f3..d83415e 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -4,8 +4,8 @@ use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, FilterFittingInside, Fits, - FitsInside, FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, - TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + FitsInside, FrameTypeStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, + SitemtyFrameType, SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -161,6 +161,16 @@ where } } +impl NewEmpty for WaveEvents { + fn empty(_shape: Shape) -> Self { + Self { + tss: Vec::new(), + pulses: Vec::new(), + vals: Vec::new(), + } + } +} + impl Appendable for WaveEvents where NTY: NumOps, @@ -173,6 +183,12 @@ where self.tss.extend_from_slice(&src.tss); self.vals.extend_from_slice(&src.vals); } + + fn append_zero(&mut self, ts1: u64, _ts2: u64) { + self.tss.push(ts1); + self.pulses.push(0); + self.vals.push(Vec::new()); + } } impl Clearable for WaveEvents { @@ -509,10 +525,10 @@ where } } -impl TimeBinnableDyn for WaveEvents { - fn aggregator_new(&self) -> Box { - todo!() +impl crate::TimeBinnableDynStub for WaveEvents {} + +impl EventsDyn for WaveEvents { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { + self as &dyn TimeBinnableDyn } } - -impl EventsDyn for WaveEvents {} diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 66e8c19..28e1725 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -2,13 +2,13 @@ use crate::binsdim0::MinMaxAvgDim0Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, - RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, FrameTypeStatic, + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, + NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; use tokio::fs::File; @@ -145,6 +145,17 @@ where } } +impl NewEmpty for XBinnedScalarEvents { + fn empty(_shape: Shape) -> Self { + Self { + tss: Vec::new(), + avgs: Vec::new(), + mins: Vec::new(), + maxs: Vec::new(), + } + } +} + impl Appendable for XBinnedScalarEvents where NTY: NumOps, @@ -159,6 +170,13 @@ where self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } + + fn append_zero(&mut self, ts1: u64, _ts2: u64) { + self.tss.push(ts1); + self.mins.push(NTY::zero()); + self.maxs.push(NTY::zero()); + self.avgs.push(0.); + } } impl Clearable for XBinnedScalarEvents { @@ -206,8 +224,8 @@ where { range: NanoRange, count: u64, - min: Option, - max: Option, + min: NTY, + max: NTY, sumc: u64, sum: f32, int_ts: u64, @@ -227,8 +245,8 @@ where int_ts: range.beg, range, count: 0, - min: None, - max: None, + min: NTY::zero(), + max: NTY::zero(), sumc: 0, sum: 0f32, last_ts: 0, @@ -240,26 +258,17 @@ where } fn apply_min_max(&mut self, min: NTY, max: NTY) { - self.min = match &self.min { - None => Some(min), - Some(cmin) => { - if &min < cmin { - Some(min) - } else { - Some(cmin.clone()) - } + if self.count == 0 { + self.min = min; + self.max = max; + } else { + if min < self.min { + self.min = min; } - }; - self.max = match &self.max { - None => Some(max), - Some(cmax) => { - if &max > cmax { - Some(max) - } else { - Some(cmax.clone()) - } + if max > self.max { + self.max = max; } - }; + } } fn apply_event_unweight(&mut self, avg: f32, min: NTY, max: NTY) { @@ -330,9 +339,9 @@ where fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim0Bins { let avg = if self.sumc == 0 { - None + 0f32 } else { - Some(self.sum / self.sumc as f32) + self.sum / self.sumc as f32 }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], @@ -345,8 +354,8 @@ where self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = None; - self.max = None; + self.min = NTY::zero(); + self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret @@ -359,7 +368,7 @@ where } let avg = { let sc = self.range.delta() as f32 * 1e-9; - Some(self.sum / sc) + self.sum / sc }; let ret = MinMaxAvgDim0Bins { ts1s: vec![self.range.beg], @@ -372,8 +381,8 @@ where self.int_ts = range.beg; self.range = range; self.count = 0; - self.min = None; - self.max = None; + self.min = NTY::zero(); + self.max = NTY::zero(); self.sum = 0f32; self.sumc = 0; ret diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 96d6d32..7eef9a9 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -2,14 +2,14 @@ use crate::binsdim1::MinMaxAvgDim1Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, PushableIndex, - RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, + Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, NewEmpty, + PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; use netpod::timeunits::*; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; use std::mem; use tokio::fs::File; @@ -147,6 +147,17 @@ where } } +impl NewEmpty for XBinnedWaveEvents { + fn empty(_shape: Shape) -> Self { + Self { + tss: Vec::new(), + avgs: Vec::new(), + mins: Vec::new(), + maxs: Vec::new(), + } + } +} + impl Appendable for XBinnedWaveEvents where NTY: NumOps, @@ -161,6 +172,13 @@ where self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } + + fn append_zero(&mut self, ts1: u64, _ts2: u64) { + self.tss.push(ts1); + self.mins.push(Vec::new()); + self.maxs.push(Vec::new()); + self.avgs.push(Vec::new()); + } } impl Clearable for XBinnedWaveEvents { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index ad63073..10e33d3 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -527,38 +527,69 @@ pub struct NodeStatus { pub archiver_appliance_status: Option, } -/** -Describes a "channel" which is a time-series with a unique name within a "backend". -In the near future, each channel should have assigned a unique id within a "backend". -Also the concept of "backend" should be split into "facility" and some optional other identifier -for cases like post-mortem, or to differentiate between channel-access and bsread for cases where -the same channel-name is delivered via different methods. -*/ +// Describes a "channel" which is a time-series with a unique name within a "backend". +// Also the concept of "backend" could be split into "facility" and some optional other identifier +// for cases like e.g. post-mortem, or to differentiate between channel-access and bsread for cases where +// the same channel-name is delivered via different methods. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Channel { - // TODO ideally, all channels would have a unique id. For scylla backends, we require the id. - // In the near future, we should also require a unique id for "databuffer" backends, indexed in postgres. pub series: Option, // "backend" is currently used in the existing systems for multiple purposes: - // it can indicate the facility (eg. sf-databuffer, hipa, ...) but also some special subsystem (eg. sf-rf-databuffer). + // it can indicate the facility (eg. sf-databuffer, hipa, ...) but also + // some special subsystem (eg. sf-rf-databuffer). pub backend: String, pub name: String, } impl Channel { + pub fn backend(&self) -> &str { + &self.backend + } + pub fn name(&self) -> &str { &self.name } - pub fn series_id(&self) -> Result { + pub fn series(&self) -> Option { self.series - .ok_or_else(|| Error::with_msg_no_trace(format!("no series id in channel"))) } } -/** +impl FromUrl for Channel { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = Channel { + backend: pairs + .get("channelBackend") + .ok_or(Error::with_public_msg("missing channelBackend"))? + .into(), + name: pairs + .get("channelName") + .ok_or(Error::with_public_msg("missing channelName"))? + .into(), + series: pairs + .get("seriesId") + .and_then(|x| x.parse::().map_or(None, |x| Some(x))), + }; + Ok(ret) + } +} + +impl AppendToUrl for Channel { + fn append_to_url(&self, url: &mut Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("channelBackend", &self.backend); + g.append_pair("channelName", &self.name); + if let Some(series) = self.series { + g.append_pair("seriesId", &format!("{series}")); + } + } +} -*/ #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelTyped { pub channel: Channel, @@ -567,8 +598,8 @@ pub struct ChannelTyped { } impl ChannelTyped { - pub fn series_id(&self) -> Result { - self.channel.series_id() + pub fn channel(&self) -> &Channel { + &self.channel } } @@ -770,7 +801,6 @@ impl<'de> serde::de::Visitor<'de> for ShapeVis { where E: serde::de::Error, { - info!("visit_str {v}"); if v == "Scalar" { Ok(Shape::Scalar) } else { @@ -784,7 +814,6 @@ impl<'de> serde::de::Visitor<'de> for ShapeVis { { use serde::de::Error; while let Some(key) = map.next_key::()? { - info!("See key {key:?}"); return if key == "Wave" { let n: u32 = map.next_value()?; Ok(Shape::Wave(n)) @@ -802,7 +831,6 @@ impl<'de> serde::de::Visitor<'de> for ShapeVis { where A: serde::de::SeqAccess<'de>, { - info!("visit_seq"); let mut a = vec![]; while let Some(item) = seq.next_element()? { let n: u32 = item; @@ -827,23 +855,6 @@ impl<'de> Deserialize<'de> for Shape { D: serde::Deserializer<'de>, { de.deserialize_any(ShapeVis) - /* - // TODO can not clone.. how to try the alternatives? - match de.deserialize_str(ShapeVis) { - Ok(k) => { - info!("De worked first try: {k:?}"); - Ok(k) - } - Err(_) => { - let ret = match >::deserialize(de)? { - ShapeOld::Scalar => Shape::Scalar, - ShapeOld::Wave(a) => Shape::Wave(a), - ShapeOld::Image(a, b) => Shape::Image(a, b), - }; - Ok(ret) - } - } - */ } } @@ -1012,42 +1023,34 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; } -//const BIN_T_LEN_OPTIONS_0: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; - -//const PATCH_T_LEN_KEY: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; - -//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [MIN * 60, HOUR * 4, DAY * 4, DAY * 32]; -// Maybe alternative for GLS: -//const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 4] = [HOUR * 4, DAY * 4, DAY * 16, DAY * 32]; - -//const PATCH_T_LEN_OPTIONS_WAVE: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; - -const BIN_T_LEN_OPTIONS_0: [u64; 2] = [ +const BIN_T_LEN_OPTIONS_0: [u64; 3] = [ // //SEC, - //MIN * 10, - HOUR * 2, + MIN * 1, + HOUR * 1, DAY, ]; -const PATCH_T_LEN_KEY: [u64; 2] = [ +const PATCH_T_LEN_KEY: [u64; 3] = [ // //SEC, - //MIN * 10, - HOUR * 2, + MIN * 1, + HOUR * 1, DAY, ]; -const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 2] = [ + +const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [ // //MIN * 60, - //HOUR * 4, - DAY * 8, - DAY * 32, + HOUR * 6, + DAY * 16, + DAY * 64, ]; -const PATCH_T_LEN_OPTIONS_WAVE: [u64; 2] = [ + +const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [ // //MIN * 10, - //HOUR * 2, + HOUR * 6, DAY * 8, DAY * 32, ]; @@ -1207,8 +1210,32 @@ impl PreBinnedPatchRange { } } } + + pub fn edges(&self) -> Vec { + let mut ret = vec![]; + let mut t = self.grid_spec.patch_t_len() * self.offset; + ret.push(t); + let bin_count = self.grid_spec.patch_t_len() / self.grid_spec.bin_t_len() * self.count; + let bin_len = self.grid_spec.bin_t_len(); + for _ in 0..bin_count { + t += bin_len; + ret.push(t); + } + ret + } + + pub fn patch_count(&self) -> u64 { + self.count + } + + pub fn bin_count(&self) -> u64 { + self.grid_spec.patch_t_len() / self.grid_spec.bin_t_len() * self.patch_count() + } } +/// Identifies one patch on the binning grid at a certain resolution. +/// A patch consists of `bin_count` consecutive bins. +/// In total, a given `PreBinnedPatchCoord` spans a time range from `patch_beg` to `patch_end`. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PreBinnedPatchCoord { spec: PreBinnedPatchGridSpec, @@ -1257,6 +1284,17 @@ impl PreBinnedPatchCoord { ix: patch_ix, } } + + pub fn edges(&self) -> Vec { + let mut ret = vec![]; + let mut t = self.patch_beg(); + ret.push(t); + for _ in 0..self.bin_count() { + t += self.bin_t_len(); + ret.push(t); + } + ret + } } impl AppendToUrl for PreBinnedPatchCoord { @@ -1761,32 +1799,6 @@ impl Default for DiskIoTune { } } -pub fn channel_from_pairs(pairs: &BTreeMap) -> Result { - let ret = Channel { - backend: pairs - .get("channelBackend") - .ok_or(Error::with_public_msg("missing channelBackend"))? - .into(), - name: pairs - .get("channelName") - .ok_or(Error::with_public_msg("missing channelName"))? - .into(), - series: pairs - .get("seriesId") - .and_then(|x| x.parse::().map_or(None, |x| Some(x))), - }; - Ok(ret) -} - -pub fn channel_append_to_url(url: &mut Url, channel: &Channel) { - let mut qp = url.query_pairs_mut(); - qp.append_pair("channelBackend", &channel.backend); - qp.append_pair("channelName", &channel.name); - if let Some(series) = &channel.series { - qp.append_pair("seriesId", &format!("{}", series)); - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchQuery { pub backend: Option, @@ -1889,6 +1901,8 @@ pub trait HasTimeout { pub trait FromUrl: Sized { fn from_url(url: &Url) -> Result; + // TODO put this in separate trait, because some implementors need url path segments to construct. + fn from_pairs(pairs: &BTreeMap) -> Result; } pub trait AppendToUrl { @@ -1899,12 +1913,10 @@ pub fn get_url_query_pairs(url: &Url) -> BTreeMap { BTreeMap::from_iter(url.query_pairs().map(|(j, k)| (j.to_string(), k.to_string()))) } -/** -Request type of the channel/config api. \ -At least on some backends the channel configuration may change depending on the queried range. -Therefore, the query includes the range. -The presence of a configuration in some range does not imply that there is any data available. -*/ +// Request type of the channel/config api. +// At least on some backends the channel configuration may change depending on the queried range. +// Therefore, the query includes the range. +// The presence of a configuration in some range does not imply that there is any data available. #[derive(Debug, Serialize, Deserialize)] pub struct ChannelConfigQuery { pub channel: Channel, @@ -1927,11 +1939,15 @@ impl HasTimeout for ChannelConfigQuery { impl FromUrl for ChannelConfigQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; let expand = pairs.get("expand").map(|s| s == "true").unwrap_or(false); let ret = Self { - channel: channel_from_pairs(&pairs)?, + channel: Channel::from_pairs(&pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), @@ -1945,7 +1961,7 @@ impl FromUrl for ChannelConfigQuery { impl AppendToUrl for ChannelConfigQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - channel_append_to_url(url, &self.channel); + self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair( "begDate", diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 0eaf46c..33e2ded 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,6 +1,5 @@ use crate::{ - channel_append_to_url, channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, - HasBackend, HasTimeout, NanoRange, ToNanos, + get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos, }; use crate::{log::*, DiskIoTune}; use chrono::{DateTime, TimeZone, Utc}; @@ -171,6 +170,14 @@ impl BinnedQuery { self.do_log } + pub fn set_series_id(&mut self, series: u64) { + self.channel.series = Some(series); + } + + pub fn channel_mut(&mut self) -> &mut Channel { + &mut self.channel + } + pub fn set_cache_usage(&mut self, k: CacheUsage) { self.cache_usage = k; } @@ -203,6 +210,10 @@ impl HasTimeout for BinnedQuery { impl FromUrl for BinnedQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let disk_stats_every = pairs.get("diskStatsEveryKb").map_or("2000", |k| k); @@ -210,7 +221,7 @@ impl FromUrl for BinnedQuery { .parse() .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { - channel: channel_from_pairs(&pairs)?, + channel: Channel::from_pairs(&pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), @@ -220,7 +231,7 @@ impl FromUrl for BinnedQuery { .ok_or(Error::with_msg("missing binCount"))? .parse() .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), cache_usage: CacheUsage::from_pairs(&pairs)?, disk_io_buffer_size: pairs .get("diskIoBufferSize") @@ -259,7 +270,7 @@ impl AppendToUrl for BinnedQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; { - channel_append_to_url(url, &self.channel); + self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair("cacheUsage", &self.cache_usage.to_string()); g.append_pair("binCount", &format!("{}", self.bin_count)); diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 5ef31ff..0bec19c 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -4,7 +4,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::frame::{decode_frame, make_term_frame}; -use items::{Framable, RangeCompletableItem, StreamItem}; +use items::{Framable, StreamItem}; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::query::RawEventsQuery;