From ceb995f8ca952f3eecd11a2d8e7a161b79de6294 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 11 Nov 2021 19:15:26 +0100 Subject: [PATCH] Refactor --- archapp/src/archeng.rs | 5 +- archapp/src/archeng/blockrefstream.rs | 154 +++++------ archapp/src/archeng/blockstream.rs | 345 ++++++++++++++++++++++++- archapp/src/archeng/configs.rs | 1 + archapp/src/archeng/datablockstream.rs | 8 +- archapp/src/archeng/indextree.rs | 161 +++++++++--- archapp/src/archeng/pipe.rs | 16 +- archapp/src/events.rs | 71 ++++- daqbuffer/src/bin/daqbuffer.rs | 2 +- daqbufp2/src/client.rs | 6 +- daqbufp2/src/test/binnedbinary.rs | 6 +- daqbufp2/src/test/binnedjson.rs | 6 +- daqbufp2/src/test/timeweightedjson.rs | 6 +- disk/src/binned.rs | 15 +- disk/src/binned/binnedfrompbv.rs | 3 +- disk/src/binned/pbv.rs | 9 +- disk/src/binned/prebinned.rs | 2 +- disk/src/binned/query.rs | 295 +-------------------- disk/src/channelexec.rs | 1 + httpret/src/channelarchiver.rs | 4 +- httpret/src/httpret.rs | 25 +- httpret/src/proxy.rs | 2 +- netpod/src/netpod.rs | 8 +- netpod/src/query.rs | 298 ++++++++++++++++++++- taskrun/src/taskrun.rs | 10 +- 25 files changed, 974 insertions(+), 485 deletions(-) diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index a78f740..f91a3fa 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -207,7 +207,7 @@ pub async fn channel_config_from_db( pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> Result { let _timed = Timed::new("channel_config"); let mut type_info = None; - let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone()); + let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone(), q.expand, conf.database.clone()); let stream = Box::pin(stream); let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1); let mut stream = stream; @@ -234,7 +234,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R if type_info.is_none() { let timed_normal = Timed::new("channel_config NORMAL"); warn!("channel_config expand mode returned none"); - let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone()); + let stream = + blockrefstream::blockref_stream(q.channel.clone(), q.range.clone(), q.expand, conf.database.clone()); let stream = Box::pin(stream); let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1); let mut stream = stream; diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index 4ff2462..2c20fb7 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -1,21 +1,18 @@ use crate::archeng::backreadbuf::BackReadBuf; -use crate::archeng::datablock::{read_data2, read_datafile_header2}; use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; use crate::archeng::indextree::{ - read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, + read_datablockref2, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, }; -use commonio::ringbuf::RingBuf; use commonio::{open_read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; -use items::WithLen; #[allow(unused)] use netpod::log::*; -use netpod::{Channel, ChannelArchiver, NanoRange}; +use netpod::{Channel, Database, NanoRange}; #[allow(unused)] use serde::Serialize; use serde_json::Value as JsVal; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::VecDeque; use std::path::PathBuf; use std::pin::Pin; use tokio::fs::File; @@ -41,37 +38,32 @@ enum Steps { } struct BlockrefStream { - conf: ChannelArchiver, + dbconf: Database, channel: Channel, range: NanoRange, + expand: bool, steps: Steps, paths: VecDeque, file1: Option>, - file2: Option>, last_dp: u64, last_dp2: u64, - last_f2: String, - last_dfhpos: DataheaderPos, - dfnotfound: BTreeMap, data_bytes_read: u64, same_dfh_count: u64, } impl BlockrefStream { - fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self { + fn new(channel: Channel, range: NanoRange, expand: bool, dbconf: Database) -> Self { + debug!("new BlockrefStream {:?}", range); Self { - conf, + dbconf, channel, range, + expand, steps: Steps::Start, paths: VecDeque::new(), file1: None, - file2: None, last_dp: 0, last_dp2: 0, - last_f2: String::new(), - last_dfhpos: DataheaderPos(u64::MAX), - dfnotfound: BTreeMap::new(), data_bytes_read: 0, same_dfh_count: 0, } @@ -88,7 +80,7 @@ impl BlockrefStream { ))) } SelectIndexFile => { - let dbc = database_connect(&self.conf.database).await?; + let dbc = database_connect(&self.dbconf).await?; let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; let rows = dbc.query(sql, &[&self.channel.name()]).await?; for row in rows { @@ -113,13 +105,16 @@ impl BlockrefStream { // For simplicity, simply read all storage classes linearly. if let Some(path) = self.paths.pop_front() { // TODO - let mut file = open_read(path.clone().into(), stats).await?; + let mut file = open_read(path.clone().into(), stats).await.map_err(|e| { + error!("can not open {:?}", path); + e + })?; let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?; let mut tree = basics .rtree_for_channel(self.channel.name(), stats) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?; - if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? { + if let Some(iter) = tree.iter_range(self.range.clone(), self.expand, stats).await? { debug!("SetupNextPath {:?}", path); self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into()); self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?); @@ -136,76 +131,12 @@ impl BlockrefStream { } } ReadBlocks(ref mut iter, ref hver, ref indexpath) => { - // TODO stats - let stats = &StatsChannel::dummy(); - // TODO I need to keep some datafile open. let item = if let Some(rec) = iter.next().await? { // TODO the iterator should actually return Dataref. We never expect child nodes here. if let RecordTarget::Dataref(dp) = rec.target { let f1 = self.file1.as_mut().unwrap(); let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?; let dpath = indexpath.parent().unwrap().join(dref.file_name()); - // TODO Remember the index path, need it here for relative path. - // TODO open datafile, relative path to index path. - // TODO keep open when path does not change. - let acc; - let num_samples; - if false { - if let Some(_) = self.dfnotfound.get(dref.file_name()) { - num_samples = 0; - acc = 1; - } else { - if dref.file_name() == self.last_f2 { - acc = 2; - } else { - match open_read(dpath.clone(), stats).await { - Ok(f2) => { - acc = 4; - self.file2 = Some( - RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy()) - .await?, - ); - self.last_f2 = dref.file_name().into(); - } - Err(_) => { - acc = 3; - self.file2 = None; - } - } - }; - if let Some(f2) = self.file2.as_mut() { - if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos { - num_samples = 0; - } else { - self.last_dfhpos = dref.data_header_pos(); - let rp1 = f2.rp_abs(); - let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?; - let data = read_data2(f2, &dfheader, self.range.clone(), false).await?; - let rp2 = f2.rp_abs(); - self.data_bytes_read += rp2 - rp1; - num_samples = dfheader.num_samples; - if data.len() != num_samples as usize { - if (data.len() as i64 - num_samples as i64).abs() < 4 { - // TODO get always one event less than num_samples tells us. - //warn!("small deviation {} vs {}", data.len(), num_samples); - } else { - return Err(Error::with_msg_no_trace(format!( - "event count mismatch {} vs {}", - data.len(), - num_samples - ))); - } - } - } - } else { - self.dfnotfound.insert(dref.file_name().into(), true); - num_samples = 0; - }; - } - } else { - acc = 6; - num_samples = 0; - } let jsval = serde_json::to_value(( dp.0, dp.0 as i64 - self.last_dp as i64, @@ -213,15 +144,20 @@ impl BlockrefStream { dref.data_header_pos.0, dref.data_header_pos.0 as i64 - self.last_dp2 as i64, dref.next().0, - acc, - num_samples, ))?; self.last_dp = dp.0; self.last_dp2 = dref.data_header_pos.0; + if rec.end.ns > self.range.end { + debug!("Have block end beyond range, stop"); + self.steps = Done; + } let bref = Blockref { dref, dpath }; + trace!("emit {:?} Record range: {:?} TO {:?}", bref, rec.beg, rec.end); BlockrefItem::Blockref(bref, jsval) } else { - panic!(); + error!("not a Dataref target"); + self.steps = Done; + BlockrefItem::JsVal(JsVal::String(format!("not a Dataref target"))) } } else { debug!( @@ -252,7 +188,47 @@ impl UnfoldExec for BlockrefStream { pub fn blockref_stream( channel: Channel, range: NanoRange, - conf: ChannelArchiver, + expand: bool, + dbconf: Database, ) -> impl Stream> { - unfold_stream(BlockrefStream::new(channel, range, conf.clone())) + unfold_stream(BlockrefStream::new(channel, range, expand, dbconf)) +} + +#[cfg(test)] +mod test { + use super::*; + use futures_util::StreamExt; + use netpod::timeunits::SEC; + + #[test] + fn find_ref_1() -> Result<(), Error> { + let fut = async move { + let channel = Channel { + backend: "sls-archive".into(), + name: "X05DA-FE-WI1:TC1".into(), + }; + use chrono::{DateTime, Utc}; + let dtbeg: DateTime = "2021-10-01T00:00:00Z".parse()?; + let dtend: DateTime = "2021-10-10T00:00:00Z".parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let dbconf = Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }; + let mut refs = Box::pin(blockref_stream(channel, range, false, dbconf)); + while let Some(item) = refs.next().await { + info!("Got ref {:?}", item); + } + Ok(()) + }; + taskrun::run(fut) + } } diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index 0c2c1b3..d88fb15 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -85,6 +85,7 @@ impl Future for FutA { } } +#[derive(Debug)] pub enum BlockItem { EventsItem(EventsItem), JsVal(JsVal), @@ -117,7 +118,7 @@ impl BlockStream { where S: Stream> + Unpin, { - debug!("new BlockStream"); + debug!("new BlockStream max_reads {} {:?}", max_reads, range); Self { inp, inp_done: false, @@ -311,11 +312,11 @@ where } } if ev.len() == 1 { - debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0); - debug!("See 1 event {:?}", Nanos::from_ns(ev.ts(0))); + trace!("From {} {:?} {}", item.fname, item.path, item.dpos.0); + trace!("See 1 event {:?}", Nanos::from_ns(ev.ts(0))); } else if ev.len() > 1 { - debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0); - debug!( + trace!("From {} {:?} {}", item.fname, item.path, item.dpos.0); + trace!( "See {} events {:?} to {:?}", ev.len(), Nanos::from_ns(ev.ts(0)), @@ -324,8 +325,8 @@ where } let mut contains_unordered = false; for i in 0..ev.len() { + // TODO factor for performance. let ts = ev.ts(i); - debug!("\nSEE EVENT {:?}", Nanos::from_ns(ts)); if ts < self.ts_max { contains_unordered = true; if true { @@ -447,3 +448,335 @@ impl Drop for BlockStream { trace!("Drop {:?}", self); } } + +#[cfg(test)] +mod test { + use super::*; + use crate::archeng::blockrefstream::blockref_stream; + use futures_util::StreamExt; + use items::{LogItem, RangeCompletableItem, StreamItem}; + use netpod::{timeunits::SEC, Channel, Database}; + use streams::rangefilter::RangeFilter; + + struct EventCount { + pre: usize, + inside: usize, + post: usize, + raco: usize, + tss: Vec, + } + + impl EventCount { + fn new() -> Self { + Self { + pre: 0, + inside: 0, + post: 0, + raco: 0, + tss: vec![], + } + } + } + + async fn count_events(range: NanoRange, expand: bool, collect_ts: bool) -> Result { + let channel = Channel { + backend: "sls-archive".into(), + name: "X05DA-FE-WI1:TC1".into(), + }; + let dbconf = Database { + host: "localhost".into(), + name: "testingdaq".into(), + user: "testingdaq".into(), + pass: "testingdaq".into(), + }; + let refs = Box::pin(blockref_stream(channel, range.clone(), expand, dbconf)); + let blocks = BlockStream::new(refs, range.clone(), 1); + let events = blocks.map(|item| match item { + Ok(k) => match k { + BlockItem::EventsItem(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))), + BlockItem::JsVal(k) => Ok(StreamItem::Log(LogItem::quick(Level::TRACE, format!("{:?}", k)))), + }, + Err(e) => Err(e), + }); + let mut filtered = RangeFilter::new(events, range.clone(), expand); + let mut ret = EventCount::new(); + while let Some(item) = filtered.next().await { + //info!("Got block {:?}", item); + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + ret.raco += 1; + } + RangeCompletableItem::Data(item) => { + let n = item.len(); + for i in 0..n { + let ts = item.ts(i); + if ts < range.beg { + ret.pre += 1; + } else if ts < range.end { + ret.inside += 1; + } else { + ret.post += 1; + } + if collect_ts { + ret.tss.push(ts); + } + } + } + }, + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + }, + Err(e) => { + return Err(e); + } + } + } + Ok(ret) + } + + #[test] + fn read_blocks_one_event() -> Result<(), Error> { + let _ev1 = "2021-10-03T09:57:59.939651334Z"; + let _ev2 = "2021-10-03T09:58:59.940910313Z"; + let _ev3 = "2021-10-03T09:59:59.940112431Z"; + let ev1ts = 1633255079939651334; + let ev2ts = 1633255139940910313; + // [ev1..ev2] + let range = NanoRange { beg: ev1ts, end: ev2ts }; + let res = taskrun::run(count_events(range, false, true))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 1); + assert_eq!(res.post, 0); + assert_eq!(res.tss[0], ev1ts); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_one_event_expand() -> Result<(), Error> { + let ev1ts = 1633255079939651334; + let ev2ts = 1633255139940910313; + let range = NanoRange { beg: ev1ts, end: ev2ts }; + let res = taskrun::run(count_events(range, true, true))?; + assert_eq!(res.pre, 1); + assert_eq!(res.inside, 1); + assert_eq!(res.post, 1); + assert_eq!(res.tss[1], ev1ts); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_two_events() -> Result<(), Error> { + let ev1ts = 1633255079939651334; + let ev2ts = 1633255139940910313; + let range = NanoRange { + beg: ev1ts, + end: ev2ts + 1, + }; + let res = taskrun::run(count_events(range, false, true))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 2); + assert_eq!(res.post, 0); + assert_eq!(res.tss[0], ev1ts); + assert_eq!(res.tss[1], ev2ts); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_two_events_expand() -> Result<(), Error> { + let ev1ts = 1633255079939651334; + let ev2ts = 1633255139940910313; + let range = NanoRange { + beg: ev1ts, + end: ev2ts + 1, + }; + let res = taskrun::run(count_events(range, true, true))?; + assert_eq!(res.pre, 1); + assert_eq!(res.inside, 2); + assert_eq!(res.post, 1); + assert_eq!(res.tss[1], ev1ts); + assert_eq!(res.tss[2], ev2ts); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_1() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-10-06T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, false, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 77); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_2() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-09-01T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, false, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 20328); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_3() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-08-01T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, false, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 35438); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_3_expand() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-08-01T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, true, false))?; + assert_eq!(res.pre, 1); + assert_eq!(res.inside, 35438); + assert_eq!(res.post, 1); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_4() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2020-01-01T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, false, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 71146); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_many_4_expand() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2020-01-01T00:00:00Z"; + let _late = "2021-10-07T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, true, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 71146); + assert_eq!(res.post, 1); + assert_eq!(res.raco, 1); + Ok(()) + } + + #[test] + fn read_blocks_late() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-10-01T00:00:00Z"; + let _late = "2021-12-01T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, false, false))?; + assert_eq!(res.pre, 0); + assert_eq!(res.inside, 3000); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 0); + Ok(()) + } + + #[test] + fn read_blocks_late_expand() -> Result<(), Error> { + use chrono::{DateTime, Utc}; + let _early = "2021-10-01T00:00:00Z"; + let _late = "2021-12-01T00:00:00Z"; + let dtbeg: DateTime = _early.parse()?; + let dtend: DateTime = _late.parse()?; + fn tons(dt: &DateTime) -> u64 { + dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64 + } + let range = NanoRange { + beg: tons(&dtbeg), + end: tons(&dtend), + }; + let res = taskrun::run(count_events(range, true, false))?; + assert_eq!(res.pre, 1); + assert_eq!(res.inside, 3000); + assert_eq!(res.post, 0); + assert_eq!(res.raco, 0); + Ok(()) + } +} diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index 62b97b9..ae1fe73 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -232,6 +232,7 @@ impl Stream for ConfigStream { let q = ChannelConfigQuery { channel, range: NanoRange { beg, end }, + expand: true, }; let fut = super::channel_config(&q, &conf); let fut = tokio::time::timeout(Duration::from_millis(2000), fut); diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs index 4dea2cb..e6cfcbc 100644 --- a/archapp/src/archeng/datablockstream.rs +++ b/archapp/src/archeng/datablockstream.rs @@ -217,7 +217,7 @@ pub struct DatablockStream { } impl DatablockStream { - pub fn for_channel_range( + pub fn _for_channel_range( range: NanoRange, channel: Channel, base_dirs: VecDeque, @@ -332,6 +332,10 @@ mod test { #[test] fn read_file_basic_info() -> Result<(), Error> { + // TODO redo test + if true { + panic!(); + } let fut = async { // file begin archive_X05DA_SH/20211001/20211001: 1633039259 // 1633145759 @@ -354,7 +358,7 @@ mod test { .map(PathBuf::from) .collect(); let expand = false; - let datablocks = DatablockStream::for_channel_range(range.clone(), channel, base_dirs, expand, u64::MAX); + let datablocks = DatablockStream::_for_channel_range(range.clone(), channel, base_dirs, expand, u64::MAX); let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand); let mut stream = filtered; while let Some(block) = stream.next().await { diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index 084d92f..c718112 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -536,45 +536,115 @@ impl Rtree { Ok(node) } - pub async fn iter_range(&mut self, range: NanoRange, stats: &StatsChannel) -> Result, Error> { + pub async fn iter_range( + &mut self, + range: NanoRange, + expand: bool, + stats: &StatsChannel, + ) -> Result, Error> { + debug!("iter_range search for {:?}", range); // TODO RecordIter needs to know when to stop after range. let ts1 = Instant::now(); let mut stack = VecDeque::new(); let mut node = self.read_node_at(self.root.clone(), stats).await?; + debug!("have root node {:?}", node); let mut node_reads = 1; - 'outer: loop { - let nr = node.records.len(); - for (i, rec) in node.records.iter().enumerate() { - if rec.beg.ns > range.beg { - match &rec.target { - RecordTarget::Child(child) => { - trace!("found non-leaf match at {} / {}", i, nr); - let child = child.clone(); - let nr = RtreeNodeAtRecord { node, rix: i }; - node = self.read_node_at(child, stats).await?; - node_reads += 1; - stack.push_back(nr); - continue 'outer; - } - RecordTarget::Dataref(_dataref) => { - trace!("found leaf match at {} / {}", i, nr); - let nr = RtreeNodeAtRecord { node, rix: i }; - stack.push_back(nr); - let ret = RecordIter { - tree: self.reopen(stats).await?, - stack, - stats: stats.clone(), - }; - let stats = TreeSearchStats::new(ts1, node_reads); - trace!("iter_range done stats: {:?}", stats); - return Ok(Some(ret)); + if let Some(rec0) = node.records.first() { + if rec0.beg.ns > range.beg || (expand && rec0.beg.ns == range.beg) { + debug!("Start at begin of tree"); + 'outer: loop { + let nrlen = node.records.len(); + for (i, rec) in node.records.iter().enumerate() { + if true { + match &rec.target { + RecordTarget::Child(child) => { + trace!("found non-leaf match at {} / {}", i, nrlen); + let child = child.clone(); + let nr = RtreeNodeAtRecord { node, rix: i }; + node = self.read_node_at(child, stats).await?; + node_reads += 1; + stack.push_back(nr); + continue 'outer; + } + RecordTarget::Dataref(_dataref) => { + trace!("found leaf match at {} / {}", i, nrlen); + let nr = RtreeNodeAtRecord { node, rix: i }; + stack.push_back(nr); + let ret = RecordIter { + tree: self.reopen(stats).await?, + stack, + range: range.clone(), + expand, + had_post: false, + done: false, + stats: stats.clone(), + }; + let stats = TreeSearchStats::new(ts1, node_reads); + trace!("iter_range done stats: {:?}", stats); + return Ok(Some(ret)); + } + } } } + //let stats = TreeSearchStats::new(ts1, node_reads); + //trace!("loop did not find something, iter_range done stats: {:?}", stats); + //return Ok(None); + return Err(Error::with_msg_no_trace("can not find the first leaf")); + } + } else { + debug!("Search within the tree"); + 'outer2: loop { + let nr = node.records.len(); + for (i, rec) in node.records.iter().enumerate().rev() { + if rec.beg.ns < range.beg || (!expand && rec.beg.ns == range.beg) { + match &rec.target { + RecordTarget::Child(child) => { + trace!("found non-leaf match at {} / {}", i, nr); + let child = child.clone(); + let nr = RtreeNodeAtRecord { node, rix: i }; + node = self.read_node_at(child, stats).await?; + node_reads += 1; + stack.push_back(nr); + continue 'outer2; + } + RecordTarget::Dataref(_dataref) => { + trace!("found leaf match at {} / {}", i, nr); + let nr = RtreeNodeAtRecord { node, rix: i }; + stack.push_back(nr); + let ret = RecordIter { + tree: self.reopen(stats).await?, + stack, + range: range.clone(), + expand, + had_post: false, + done: false, + stats: stats.clone(), + }; + let stats = TreeSearchStats::new(ts1, node_reads); + trace!("iter_range done stats: {:?}", stats); + return Ok(Some(ret)); + } + } + } + } + //let stats = TreeSearchStats::new(ts1, node_reads); + //trace!("loop did not find something, iter_range done stats: {:?}", stats); + //return Ok(None); + return Err(Error::with_msg_no_trace("expected to find a leaf")); } } - let stats = TreeSearchStats::new(ts1, node_reads); - trace!("iter_range done stats: {:?}", stats); - return Ok(None); + } else { + debug!("no records at all"); + let ret = RecordIter { + tree: self.reopen(stats).await?, + stack, + range: range.clone(), + expand, + had_post: false, + done: false, + stats: stats.clone(), + }; + return Ok(Some(ret)); } } @@ -595,11 +665,21 @@ impl Rtree { pub struct RecordIter { tree: Rtree, stack: VecDeque, + range: NanoRange, + expand: bool, + had_post: bool, + done: bool, stats: StatsChannel, } impl RecordIter { pub async fn next(&mut self) -> Result, Error> { + if self.done { + return Ok(None); + } + if self.had_post { + self.done = true; + } match self.stack.back_mut() { Some(nr) => { assert_eq!(nr.node.is_leaf, true); @@ -607,6 +687,13 @@ impl RecordIter { let ret = ret.clone(); if nr.advance()? { //trace!("still more records here {} / {}", nr.rix, nr.node.records.len()); + let beg2 = nr.rec().unwrap().beg.ns; + if beg2 >= self.range.end { + self.had_post = true; + if !self.expand { + self.done = true; + } + } } else { loop { if self.stack.pop_back().is_none() { @@ -636,6 +723,13 @@ impl RecordIter { RecordTarget::Dataref(_) => { trace!("loop B is-leaf"); // done, we've positioned the next result. + let beg2 = n2.rec().unwrap().beg.ns; + if beg2 >= self.range.end { + self.had_post = true; + if !self.expand { + self.done = true; + } + } break; } } @@ -697,6 +791,7 @@ impl TreeSearchStats { } } +// TODO get rid of this in favor of RecordIter. pub async fn search_record( file: &mut File, rtree_m: usize, @@ -732,6 +827,7 @@ pub async fn search_record( } } +// TODO get rid of this in favor of RecordIter. pub async fn search_record_expand_try( file: &mut File, rtree_m: usize, @@ -770,6 +866,7 @@ pub async fn search_record_expand_try( } } +// TODO get rid of this in favor of RecordIter. pub async fn search_record_expand( file: &mut File, rtree_m: usize, @@ -1111,7 +1208,7 @@ mod test { .await? .ok_or_else(|| Error::with_msg("no tree found for channel"))?; let mut iter = tree - .iter_range(range, stats) + .iter_range(range, false, stats) .await? .ok_or_else(|| Error::with_msg("could not position iterator"))?; let mut i1 = 0; @@ -1148,7 +1245,7 @@ mod test { .await? .ok_or_else(|| Error::with_msg("no tree found for channel"))?; let mut iter = tree - .iter_range(range, stats) + .iter_range(range, false, stats) .await? .ok_or_else(|| Error::with_msg("could not position iterator"))?; let mut i1 = 0; diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index 5490415..2e1153d 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -23,12 +23,18 @@ pub async fn make_event_pipe( let q = ChannelConfigQuery { channel: evq.channel.clone(), range: evq.range.clone(), + expand: evq.agg_kind.need_expand(), }; crate::archeng::channel_config_from_db(&q, &conf).await? }; debug!("Channel config: {:?}", channel_config); use crate::archeng::blockstream::BlockItem; - let refs = blockref_stream(evq.channel.clone(), evq.range.clone(), conf.clone()); + let refs = blockref_stream( + evq.channel.clone(), + evq.range.clone(), + evq.agg_kind.need_expand(), + conf.database.clone(), + ); let blocks = BlockStream::new(Box::pin(refs), evq.range.clone(), 1); let blocks = blocks.map(|k| match k { Ok(item) => match item { @@ -84,10 +90,13 @@ pub async fn make_event_pipe( Ok(Box::pin(ret)) } -pub async fn make_event_pipe1( +pub async fn _make_event_pipe1( evq: &RawEventsQuery, conf: ChannelArchiver, ) -> Result> + Send>>, Error> { + // TODO unused + err::todo(); + let range = evq.range.clone(); let channel = evq.channel.clone(); let expand = evq.agg_kind.need_expand(); @@ -99,11 +108,12 @@ pub async fn make_event_pipe1( let q = ChannelConfigQuery { channel: channel.clone(), range: range.clone(), + expand: false, }; crate::archeng::channel_config_from_db(&q, &conf).await? }; - let data = DatablockStream::for_channel_range( + let data = DatablockStream::_for_channel_range( range.clone(), channel, conf.data_base_paths.clone().into(), diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 997e671..b4e9d09 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -82,6 +82,7 @@ impl FrameMaker { #[allow(unused_macros)] macro_rules! events_item_to_sitemty { ($ei:expr, $t1:ident, $t2:ident, $t3:ident) => {{ + let combo = format!("t1 {} t2 {} t3 {}", stringify!($t1), stringify!($t2), stringify!($t3)); let ret = match $ei { Ok(k) => match k { StreamItem::DataItem(k) => match k { @@ -95,13 +96,22 @@ macro_rules! events_item_to_sitemty { // match h { $t2::$t3(h) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))), - _ => panic!(), + _ => { + warn!("case AA {}", combo); + panic!() + } } } - _ => panic!(), + _ => { + warn!("case BB {}", combo); + panic!() + } } } - _ => panic!(), + _ => { + warn!("case CC {}", combo); + panic!() + } } } RangeCompletableItem::RangeComplete => { @@ -120,6 +130,16 @@ macro_rules! events_item_to_sitemty { macro_rules! arm2 { ($item:expr, $t1:ident, $t2:ident, $t3:ident, $t4:ident, $t5:ident, $sty1:ident, $sty2:ident) => {{ type T1 = $t1<$sty1>; + let combo = format!( + "t1 {} t2 {} t3 {} t4 {} t5 {} sty1 {} sty2 {}", + stringify!($t1), + stringify!($t2), + stringify!($t3), + stringify!($t4), + stringify!($t5), + stringify!($sty1), + stringify!($sty2) + ); let ret: Sitemty = match $item { Ok(k) => match k { StreamItem::DataItem(k) => match k { @@ -133,13 +153,18 @@ macro_rules! arm2 { // Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) } - _ => panic!(), + _ => { + warn!("unclear what to do A {}", combo); + err::todoval() + } }, - - _ => panic!(), + _ => { + warn!("unclear what to do B {}", combo); + err::todoval() + } }, _ => { - error!("unexpected arm2 case"); + error!("unexpected arm2 case {}", combo); err::todoval() } }, @@ -157,7 +182,10 @@ macro_rules! arm1 { ($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{ match $shape { Shape::Scalar => match $ak { - AggKind::EventBlobs => panic!(), + AggKind::EventBlobs => { + warn!("arm1 unhandled EventBlobs"); + panic!() + } AggKind::Plain => arm2!( $item, EventValues, @@ -168,6 +196,16 @@ macro_rules! arm1 { $sty1, $sty2 ), + AggKind::TimeWeightedScalar => arm2!( + $item, + EventValues, + XBinnedEvents, + XBinnedEvents, + Scalar, + ScalarPlainEvents, + $sty1, + $sty2 + ), AggKind::DimXBins1 => arm2!( $item, EventValues, @@ -178,7 +216,6 @@ macro_rules! arm1 { $sty1, $sty2 ), - AggKind::TimeWeightedScalar => panic!(), AggKind::DimXBinsN(_) => arm2!( $item, EventValues, @@ -191,7 +228,10 @@ macro_rules! arm1 { ), }, Shape::Wave(_) => match $ak { - AggKind::EventBlobs => panic!(), + AggKind::EventBlobs => { + warn!("arm1 unhandled EventBlobs"); + panic!() + } AggKind::Plain => arm2!( $item, WaveEvents, @@ -202,7 +242,16 @@ macro_rules! arm1 { $sty1, $sty2 ), - AggKind::TimeWeightedScalar => panic!(), + AggKind::TimeWeightedScalar => arm2!( + $item, + XBinnedScalarEvents, + XBinnedEvents, + XBinnedEvents, + SingleBinWave, + SingleBinWaveEvents, + $sty1, + $sty2 + ), AggKind::DimXBins1 => arm2!( $item, XBinnedScalarEvents, diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 8f75fd3..459acc6 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -1,9 +1,9 @@ use chrono::{DateTime, Duration, Utc}; use clap::Clap; use daqbuffer::cli::{ClientType, Opts, SubCmd}; -use disk::binned::query::CacheUsage; use err::Error; use netpod::log::*; +use netpod::query::CacheUsage; use netpod::{NodeConfig, NodeConfigCached, ProxyConfig}; use tokio::fs::File; use tokio::io::AsyncReadExt; diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 944c9b0..32c4732 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -1,5 +1,4 @@ use chrono::{DateTime, Utc}; -use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -8,8 +7,9 @@ use http::StatusCode; use hyper::Body; use items::xbinnedwaveevents::XBinnedWaveEvents; use items::{FrameType, Sitemty, StreamItem}; -use netpod::log::*; -use netpod::{AggKind, AppendToUrl, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use netpod::query::{BinnedQuery, CacheUsage}; +use netpod::{log::*, AppendToUrl}; +use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; use url::Url; pub async fn status(host: String, port: u16) -> Result<(), Error> { diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index a5c737d..41253dc 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -1,6 +1,5 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::binned::query::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -9,8 +8,9 @@ use http::StatusCode; use hyper::Body; use items::minmaxavgbins::MinMaxAvgBins; use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen}; -use netpod::log::*; -use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use netpod::query::{BinnedQuery, CacheUsage}; +use netpod::{log::*, AppendToUrl}; +use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; use serde::de::DeserializeOwned; use std::fmt; use std::future::ready; diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index a775e81..00adb16 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -1,11 +1,11 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::binned::query::{BinnedQuery, CacheUsage}; use err::Error; use http::StatusCode; use hyper::Body; -use netpod::log::*; -use netpod::{AggKind, AppendToUrl, Channel, Cluster, NanoRange, APP_JSON}; +use netpod::query::{BinnedQuery, CacheUsage}; +use netpod::{log::*, AppendToUrl}; +use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; use std::time::Duration; use url::Url; diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index ffd4535..84ac7dd 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -1,11 +1,11 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::binned::query::{BinnedQuery, CacheUsage}; use err::Error; use http::StatusCode; use hyper::Body; -use netpod::log::*; -use netpod::{AggKind, AppendToUrl, Channel, Cluster, NanoRange, APP_JSON}; +use netpod::query::{BinnedQuery, CacheUsage}; +use netpod::{log::*, AppendToUrl}; +use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; use std::time::Duration; use url::Url; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 7135e9a..330f62b 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,6 +1,11 @@ +pub mod binnedfrompbv; +pub mod dim1; +pub mod pbv; +pub mod prebinned; +pub mod query; + use crate::agg::binnedt::TBinnerStream; use crate::binned::binnedfrompbv::BinnedFromPreBinned; -use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; @@ -17,7 +22,7 @@ use items::{ Sitemty, StreamItem, TimeBinnableType, WithLen, }; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::{BinnedQuery, RawEventsQuery}; use netpod::{ x_bin_count, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape, }; @@ -27,12 +32,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -pub mod binnedfrompbv; -pub mod dim1; -pub mod pbv; -pub mod prebinned; -pub mod query; - pub struct BinnedStreamRes { pub binned_stream: BoxedStream>, Error>>, pub range: BinnedRange, diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 77af6e2..3c7644a 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::TBinnerStream; -use crate::binned::query::{CacheUsage, PreBinnedQuery}; +use crate::binned::query::PreBinnedQuery; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; @@ -9,6 +9,7 @@ use http::{StatusCode, Uri}; use items::frame::decode_frame; use items::{FrameType, RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType}; use netpod::log::*; +use netpod::query::CacheUsage; use netpod::{ x_bin_count, AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, Shape, diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 8c24ba8..46da970 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,6 +1,6 @@ use crate::agg::binnedt::TBinnerStream; use crate::binned::binnedfrompbv::FetchedPreBinned; -use crate::binned::query::{CacheUsage, PreBinnedQuery}; +use crate::binned::query::PreBinnedQuery; use crate::binned::WithLen; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; @@ -15,7 +15,7 @@ use items::{ ReadableFromFile, Sitemty, StreamItem, TimeBinnableType, }; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::{CacheUsage, RawEventsQuery}; use netpod::{ x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape, }; @@ -129,8 +129,10 @@ where let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()).map(|k| { info!( "setup_merged_from_remotes, MergedFromRemotes yields {:?}", - show_event_basic_info(&k) + //show_event_basic_info(&k) + "TODO show_event_basic_info" ); + k }); let ret = TBinnerStream::<_, ::Output>::new( s, @@ -208,6 +210,7 @@ where } fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { + info!("try_setup_fetch_prebinned_higher_res"); let range = self.query.patch().patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { Ok(Some(range)) => { diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 0ea051a..fbb52a5 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -176,10 +176,10 @@ pub async fn pre_binned_bytes_for_http( )); return Err(err); } - let q = ChannelConfigQuery { channel: query.channel().clone(), range: query.patch().patch_range(), + expand: query.agg_kind().need_expand(), }; let conf = httpclient::get_channel_config(&q, node_config).await?; let ret = make_num_pipeline( diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 7b2adb0..d8164b6 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -1,13 +1,8 @@ -use chrono::{DateTime, TimeZone, Utc}; use err::Error; use http::request::Parts; -use netpod::log::*; -use netpod::{ - channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, - NanoRange, PreBinnedPatchCoord, ToNanos, -}; +use netpod::query::{agg_kind_from_binning_scheme, binning_scheme_append_to_url, CacheUsage}; +use netpod::{channel_from_pairs, AggKind, AppendToUrl, ByteSize, Channel, PreBinnedPatchCoord}; use std::collections::BTreeMap; -use std::time::Duration; use url::Url; #[derive(Clone, Debug)] @@ -134,289 +129,3 @@ impl AppendToUrl for PreBinnedQuery { g.append_pair("reportError", &format!("{}", self.report_error())); } } - -#[derive(Clone, Debug)] -pub enum CacheUsage { - Use, - Ignore, - Recreate, -} - -impl CacheUsage { - pub fn query_param_value(&self) -> String { - match self { - CacheUsage::Use => "use", - CacheUsage::Ignore => "ignore", - CacheUsage::Recreate => "recreate", - } - .into() - } - - pub fn from_pairs(pairs: &BTreeMap) -> Result { - let ret = pairs.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { - if k == "use" { - Ok(CacheUsage::Use) - } else if k == "ignore" { - Ok(CacheUsage::Ignore) - } else if k == "recreate" { - Ok(CacheUsage::Recreate) - } else { - Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? - } - })?; - Ok(ret) - } - - pub fn from_string(s: &str) -> Result { - let ret = if s == "ignore" { - CacheUsage::Ignore - } else if s == "recreate" { - CacheUsage::Recreate - } else if s == "use" { - CacheUsage::Use - } else { - return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s))); - }; - Ok(ret) - } -} - -impl std::fmt::Display for CacheUsage { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "{}", self.query_param_value()) - } -} - -#[derive(Clone, Debug)] -pub struct BinnedQuery { - channel: Channel, - range: NanoRange, - bin_count: u32, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_io_buffer_size: usize, - disk_stats_every: ByteSize, - report_error: bool, - timeout: Duration, - abort_after_bin_count: u32, - do_log: bool, -} - -impl BinnedQuery { - pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> Self { - Self { - channel, - range, - bin_count, - agg_kind, - cache_usage: CacheUsage::Use, - disk_io_buffer_size: 1024 * 4, - disk_stats_every: ByteSize(1024 * 1024 * 4), - report_error: false, - timeout: Duration::from_millis(2000), - abort_after_bin_count: 0, - do_log: false, - } - } - - pub fn range(&self) -> &NanoRange { - &self.range - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn bin_count(&self) -> u32 { - self.bin_count - } - - pub fn agg_kind(&self) -> &AggKind { - &self.agg_kind - } - - pub fn cache_usage(&self) -> &CacheUsage { - &self.cache_usage - } - - pub fn disk_stats_every(&self) -> &ByteSize { - &self.disk_stats_every - } - - pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size - } - - pub fn report_error(&self) -> bool { - self.report_error - } - - pub fn timeout(&self) -> Duration { - self.timeout - } - - pub fn abort_after_bin_count(&self) -> u32 { - self.abort_after_bin_count - } - - pub fn do_log(&self) -> bool { - self.do_log - } - - pub fn set_cache_usage(&mut self, k: CacheUsage) { - self.cache_usage = k; - } - - pub fn set_disk_stats_every(&mut self, k: ByteSize) { - self.disk_stats_every = k; - } - - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; - } - - pub fn set_disk_io_buffer_size(&mut self, k: usize) { - self.disk_io_buffer_size = k; - } -} - -impl HasBackend for BinnedQuery { - fn backend(&self) -> &str { - &self.channel.backend - } -} - -impl HasTimeout for BinnedQuery { - fn timeout(&self) -> Duration { - self.timeout.clone() - } -} - -impl FromUrl for BinnedQuery { - fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let disk_stats_every = pairs.get("diskStatsEveryKb").map_or("2000", |k| k); - let disk_stats_every = disk_stats_every - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; - let ret = Self { - channel: channel_from_pairs(&pairs)?, - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - bin_count: pairs - .get("binCount") - .ok_or(Error::with_msg("missing binCount"))? - .parse() - .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), - cache_usage: CacheUsage::from_pairs(&pairs)?, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: pairs - .get("timeout") - .map_or("2000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, - abort_after_bin_count: pairs - .get("abortAfterBinCount") - .map_or("0", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, - do_log: pairs - .get("doLog") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, - }; - debug!("BinnedQuery::from_url {:?}", ret); - Ok(ret) - } -} - -impl AppendToUrl for BinnedQuery { - fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - { - let mut g = url.query_pairs_mut(); - g.append_pair("cacheUsage", &self.cache_usage.to_string()); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); - g.append_pair("binCount", &format!("{}", self.bin_count)); - g.append_pair( - "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), - ); - g.append_pair( - "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), - ); - } - { - binning_scheme_append_to_url(&self.agg_kind, url); - } - { - let mut g = url.query_pairs_mut(); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); - g.append_pair("doLog", &format!("{}", self.do_log)); - } - } -} - -fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { - let mut g = url.query_pairs_mut(); - match agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::TimeWeightedScalar => { - g.append_pair("binningScheme", "timeWeightedScalar"); - } - AggKind::Plain => { - g.append_pair("binningScheme", "fullValue"); - } - AggKind::DimXBins1 => { - g.append_pair("binningScheme", "unweightedScalar"); - } - AggKind::DimXBinsN(n) => { - g.append_pair("binningScheme", "toScalarX"); - g.append_pair("binnedXcount", &format!("{}", n)); - } - } -} - -fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result { - let key = "binningScheme"; - let s = pairs - .get(key) - .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; - let ret = if s == "eventBlobs" { - AggKind::EventBlobs - } else if s == "fullValue" { - AggKind::Plain - } else if s == "timeWeightedScalar" { - AggKind::TimeWeightedScalar - } else if s == "unweightedScalar" { - AggKind::DimXBins1 - } else if s == "binnedX" { - let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; - AggKind::DimXBinsN(u) - } else { - return Err(Error::with_msg("can not extract binningScheme")); - }; - Ok(ret) -} diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 9990666..ec489ba 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -197,6 +197,7 @@ where let q = ChannelConfigQuery { channel: channel.clone(), range: range.clone(), + expand: agg_kind.need_expand(), }; let conf = httpclient::get_channel_config(&q, node_config).await?; let ret = channel_exec_config( diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 0a7e730..736d14a 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -280,7 +280,7 @@ impl BlockRefStream { //name: "ARIDI-PCT:CURRENT".into(), }; use archapp_wrap::archapp::archeng; - let s = archeng::blockrefstream::blockref_stream(channel, range, conf.clone()); + let s = archeng::blockrefstream::blockref_stream(channel, range, true, conf.database.clone()); let s = s.map(|item| match item { Ok(item) => { use archeng::blockrefstream::BlockrefItem::*; @@ -346,7 +346,7 @@ impl BlockStream { name: channel_name, }; use archapp_wrap::archapp::archeng; - let s = archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone()); + let s = archeng::blockrefstream::blockref_stream(channel, range.clone(), true, conf.database.clone()); let s = Box::pin(s); let s = archeng::blockstream::BlockStream::new(s, range.clone(), read_queue); let s = s.map(|item| match item { diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 2ef45bb..603701b 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,7 +1,14 @@ +pub mod api1; +pub mod channelarchiver; +pub mod gather; +pub mod proxy; +pub mod pulsemap; +pub mod search; + use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; use bytes::Bytes; -use disk::binned::query::{BinnedQuery, PreBinnedQuery}; +use disk::binned::query::PreBinnedQuery; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use err::Error; use future::Future; @@ -13,11 +20,10 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; +use netpod::query::BinnedQuery; use netpod::timeunits::SEC; -use netpod::{ - channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON, - APP_JSON_LINES, APP_OCTET, -}; +use netpod::{channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached}; +use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; @@ -28,13 +34,6 @@ use tracing::field::Empty; use tracing::Instrument; use url::Url; -pub mod api1; -pub mod channelarchiver; -pub mod gather; -pub mod proxy; -pub mod pulsemap; -pub mod search; - fn proxy_mark() -> &'static str { "7c5e408a" } @@ -53,9 +52,11 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { // TODO send to logstash debug!("new connection from {:?}", conn.remote_addr()); let node_config = node_config.clone(); + let addr = conn.remote_addr(); async move { Ok::<_, Error>(service_fn({ move |req| { + info!("REQUEST {:?} {:?}", addr, req.uri()); let f = http_service(req, node_config.clone()); Cont { f: Box::pin(f) } } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index cdd8d5a..f843f34 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -3,7 +3,6 @@ pub mod api4; use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{api_1_docs, api_4_docs, response, Cont}; -use disk::binned::query::BinnedQuery; use disk::events::PlainEventsJsonQuery; use err::Error; use futures_core::Stream; @@ -14,6 +13,7 @@ use hyper::{Body, Request, Response, Server}; use hyper_tls::HttpsConnector; use itertools::Itertools; use netpod::log::*; +use netpod::query::BinnedQuery; use netpod::{ AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, HasBackend, HasTimeout, ProxyConfig, APP_JSON, diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index ab9e5bd..0452733 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -994,7 +994,7 @@ impl fmt::Display for AggKind { } impl fmt::Debug for AggKind { - fn fmt(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(self, fmt) } } @@ -1341,6 +1341,7 @@ The presence of a configuration in some range does not imply that there is any d pub struct ChannelConfigQuery { pub channel: Channel, pub range: NanoRange, + pub expand: bool, } impl HasBackend for ChannelConfigQuery { @@ -1360,12 +1361,14 @@ impl FromUrl for ChannelConfigQuery { let pairs = get_url_query_pairs(url); let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let expand = pairs.get("expand").map(|s| s == "true").unwrap_or(false); let ret = Self { channel: channel_from_pairs(&pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, + expand, }; Ok(ret) } @@ -1385,6 +1388,9 @@ impl AppendToUrl for ChannelConfigQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); + if self.expand { + g.append_pair("expand", "true"); + } } } diff --git a/netpod/src/query.rs b/netpod/src/query.rs index efbf6f3..28342f8 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,5 +1,67 @@ -use crate::{AggKind, Channel, NanoRange}; +use crate::log::*; +use crate::{ + channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, + NanoRange, ToNanos, +}; +use chrono::{DateTime, TimeZone, Utc}; +use err::Error; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::fmt; +use std::time::Duration; +use url::Url; + +#[derive(Clone, Debug)] +pub enum CacheUsage { + Use, + Ignore, + Recreate, +} + +impl CacheUsage { + pub fn query_param_value(&self) -> String { + match self { + CacheUsage::Use => "use", + CacheUsage::Ignore => "ignore", + CacheUsage::Recreate => "recreate", + } + .into() + } + + pub fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = pairs.get("cacheUsage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { + if k == "use" { + Ok(CacheUsage::Use) + } else if k == "ignore" { + Ok(CacheUsage::Ignore) + } else if k == "recreate" { + Ok(CacheUsage::Recreate) + } else { + Err(Error::with_msg(format!("unexpected cacheUsage {:?}", k)))? + } + })?; + Ok(ret) + } + + pub fn from_string(s: &str) -> Result { + let ret = if s == "ignore" { + CacheUsage::Ignore + } else if s == "recreate" { + CacheUsage::Recreate + } else if s == "use" { + CacheUsage::Use + } else { + return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s))); + }; + Ok(ret) + } +} + +impl fmt::Display for CacheUsage { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.query_param_value()) + } +} /** Query parameters to request (optionally) X-processed, but not T-processed events. @@ -12,3 +74,237 @@ pub struct RawEventsQuery { pub disk_io_buffer_size: usize, pub do_decompress: bool, } + +#[derive(Clone, Debug)] +pub struct BinnedQuery { + channel: Channel, + range: NanoRange, + bin_count: u32, + agg_kind: AggKind, + cache_usage: CacheUsage, + disk_io_buffer_size: usize, + disk_stats_every: ByteSize, + report_error: bool, + timeout: Duration, + abort_after_bin_count: u32, + do_log: bool, +} + +impl BinnedQuery { + pub fn new(channel: Channel, range: NanoRange, bin_count: u32, agg_kind: AggKind) -> Self { + Self { + channel, + range, + bin_count, + agg_kind, + cache_usage: CacheUsage::Use, + disk_io_buffer_size: 1024 * 4, + disk_stats_every: ByteSize(1024 * 1024 * 4), + report_error: false, + timeout: Duration::from_millis(2000), + abort_after_bin_count: 0, + do_log: false, + } + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn bin_count(&self) -> u32 { + self.bin_count + } + + pub fn agg_kind(&self) -> &AggKind { + &self.agg_kind + } + + pub fn cache_usage(&self) -> &CacheUsage { + &self.cache_usage + } + + pub fn disk_stats_every(&self) -> &ByteSize { + &self.disk_stats_every + } + + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn abort_after_bin_count(&self) -> u32 { + self.abort_after_bin_count + } + + pub fn do_log(&self) -> bool { + self.do_log + } + + pub fn set_cache_usage(&mut self, k: CacheUsage) { + self.cache_usage = k; + } + + pub fn set_disk_stats_every(&mut self, k: ByteSize) { + self.disk_stats_every = k; + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + + pub fn set_disk_io_buffer_size(&mut self, k: usize) { + self.disk_io_buffer_size = k; + } +} + +impl HasBackend for BinnedQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for BinnedQuery { + fn timeout(&self) -> Duration { + self.timeout.clone() + } +} + +impl FromUrl for BinnedQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let disk_stats_every = pairs.get("diskStatsEveryKb").map_or("2000", |k| k); + let disk_stats_every = disk_stats_every + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; + let ret = Self { + channel: channel_from_pairs(&pairs)?, + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + bin_count: pairs + .get("binCount") + .ok_or(Error::with_msg("missing binCount"))? + .parse() + .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + cache_usage: CacheUsage::from_pairs(&pairs)?, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, + disk_stats_every: ByteSize::kb(disk_stats_every), + report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, + timeout: pairs + .get("timeout") + .map_or("2000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, + abort_after_bin_count: pairs + .get("abortAfterBinCount") + .map_or("0", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse abortAfterBinCount {:?}", e)))?, + do_log: pairs + .get("doLog") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?, + }; + debug!("BinnedQuery::from_url {:?}", ret); + Ok(ret) + } +} + +impl AppendToUrl for BinnedQuery { + fn append_to_url(&self, url: &mut Url) { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + { + let mut g = url.query_pairs_mut(); + g.append_pair("cacheUsage", &self.cache_usage.to_string()); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair("binCount", &format!("{}", self.bin_count)); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + } + { + binning_scheme_append_to_url(&self.agg_kind, url); + } + { + let mut g = url.query_pairs_mut(); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); + g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); + g.append_pair("doLog", &format!("{}", self.do_log)); + } + } +} + +pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { + let mut g = url.query_pairs_mut(); + match agg_kind { + AggKind::EventBlobs => panic!(), + AggKind::TimeWeightedScalar => { + g.append_pair("binningScheme", "timeWeightedScalar"); + } + AggKind::Plain => { + g.append_pair("binningScheme", "fullValue"); + } + AggKind::DimXBins1 => { + g.append_pair("binningScheme", "unweightedScalar"); + } + AggKind::DimXBinsN(n) => { + g.append_pair("binningScheme", "toScalarX"); + g.append_pair("binnedXcount", &format!("{}", n)); + } + } +} + +pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result { + let key = "binningScheme"; + let s = pairs + .get(key) + .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; + let ret = if s == "eventBlobs" { + AggKind::EventBlobs + } else if s == "fullValue" { + AggKind::Plain + } else if s == "timeWeightedScalar" { + AggKind::TimeWeightedScalar + } else if s == "unweightedScalar" { + AggKind::DimXBins1 + } else if s == "binnedX" { + let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; + AggKind::DimXBinsN(u) + } else { + return Err(Error::with_msg("can not extract binningScheme")); + }; + Ok(ret) +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index d3c1f23..314d408 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -95,13 +95,15 @@ pub fn tracing_init() { "archapp::archeng=info", "archapp::archeng::datablockstream=info", "archapp::archeng::indextree=info", - "archapp::archeng::blockstream=debug", + "archapp::archeng::blockrefstream=info", + "archapp::archeng::blockstream=info", "archapp::archeng::ringbuf=info", "archapp::archeng::backreadbuf=info", - "archapp::archeng::pipe=trace", + "archapp::archeng::pipe=info", "archapp::storagemerge=info", - "streams::rangefilter=debug", - "daqbuffer::test=trace", + "streams::rangefilter=info", + "disk::binned=info", + "daqbuffer::test=info", ] .join(","), ))