This commit is contained in:
Dominik Werder
2021-11-11 19:15:26 +01:00
parent 21e16cfa6d
commit ceb995f8ca
25 changed files with 974 additions and 485 deletions

View File

@@ -207,7 +207,7 @@ pub async fn channel_config_from_db(
pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> Result<ChannelConfigResponse, Error> {
let _timed = Timed::new("channel_config");
let mut type_info = None;
let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone());
let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone(), q.expand, conf.database.clone());
let stream = Box::pin(stream);
let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1);
let mut stream = stream;
@@ -234,7 +234,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
if type_info.is_none() {
let timed_normal = Timed::new("channel_config NORMAL");
warn!("channel_config expand mode returned none");
let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone());
let stream =
blockrefstream::blockref_stream(q.channel.clone(), q.range.clone(), q.expand, conf.database.clone());
let stream = Box::pin(stream);
let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1);
let mut stream = stream;

View File

@@ -1,21 +1,18 @@
use crate::archeng::backreadbuf::BackReadBuf;
use crate::archeng::datablock::{read_data2, read_datafile_header2};
use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec};
use crate::archeng::indextree::{
read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget,
read_datablockref2, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget,
};
use commonio::ringbuf::RingBuf;
use commonio::{open_read, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use items::WithLen;
#[allow(unused)]
use netpod::log::*;
use netpod::{Channel, ChannelArchiver, NanoRange};
use netpod::{Channel, Database, NanoRange};
#[allow(unused)]
use serde::Serialize;
use serde_json::Value as JsVal;
use std::collections::{BTreeMap, VecDeque};
use std::collections::VecDeque;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::fs::File;
@@ -41,37 +38,32 @@ enum Steps {
}
struct BlockrefStream {
conf: ChannelArchiver,
dbconf: Database,
channel: Channel,
range: NanoRange,
expand: bool,
steps: Steps,
paths: VecDeque<String>,
file1: Option<BackReadBuf<File>>,
file2: Option<RingBuf<File>>,
last_dp: u64,
last_dp2: u64,
last_f2: String,
last_dfhpos: DataheaderPos,
dfnotfound: BTreeMap<String, bool>,
data_bytes_read: u64,
same_dfh_count: u64,
}
impl BlockrefStream {
fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self {
fn new(channel: Channel, range: NanoRange, expand: bool, dbconf: Database) -> Self {
debug!("new BlockrefStream {:?}", range);
Self {
conf,
dbconf,
channel,
range,
expand,
steps: Steps::Start,
paths: VecDeque::new(),
file1: None,
file2: None,
last_dp: 0,
last_dp2: 0,
last_f2: String::new(),
last_dfhpos: DataheaderPos(u64::MAX),
dfnotfound: BTreeMap::new(),
data_bytes_read: 0,
same_dfh_count: 0,
}
@@ -88,7 +80,7 @@ impl BlockrefStream {
)))
}
SelectIndexFile => {
let dbc = database_connect(&self.conf.database).await?;
let dbc = database_connect(&self.dbconf).await?;
let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index";
let rows = dbc.query(sql, &[&self.channel.name()]).await?;
for row in rows {
@@ -113,13 +105,16 @@ impl BlockrefStream {
// For simplicity, simply read all storage classes linearly.
if let Some(path) = self.paths.pop_front() {
// TODO
let mut file = open_read(path.clone().into(), stats).await?;
let mut file = open_read(path.clone().into(), stats).await.map_err(|e| {
error!("can not open {:?}", path);
e
})?;
let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(self.channel.name(), stats)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?;
if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? {
if let Some(iter) = tree.iter_range(self.range.clone(), self.expand, stats).await? {
debug!("SetupNextPath {:?}", path);
self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into());
self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?);
@@ -136,76 +131,12 @@ impl BlockrefStream {
}
}
ReadBlocks(ref mut iter, ref hver, ref indexpath) => {
// TODO stats
let stats = &StatsChannel::dummy();
// TODO I need to keep some datafile open.
let item = if let Some(rec) = iter.next().await? {
// TODO the iterator should actually return Dataref. We never expect child nodes here.
if let RecordTarget::Dataref(dp) = rec.target {
let f1 = self.file1.as_mut().unwrap();
let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?;
let dpath = indexpath.parent().unwrap().join(dref.file_name());
// TODO Remember the index path, need it here for relative path.
// TODO open datafile, relative path to index path.
// TODO keep open when path does not change.
let acc;
let num_samples;
if false {
if let Some(_) = self.dfnotfound.get(dref.file_name()) {
num_samples = 0;
acc = 1;
} else {
if dref.file_name() == self.last_f2 {
acc = 2;
} else {
match open_read(dpath.clone(), stats).await {
Ok(f2) => {
acc = 4;
self.file2 = Some(
RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy())
.await?,
);
self.last_f2 = dref.file_name().into();
}
Err(_) => {
acc = 3;
self.file2 = None;
}
}
};
if let Some(f2) = self.file2.as_mut() {
if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos {
num_samples = 0;
} else {
self.last_dfhpos = dref.data_header_pos();
let rp1 = f2.rp_abs();
let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?;
let data = read_data2(f2, &dfheader, self.range.clone(), false).await?;
let rp2 = f2.rp_abs();
self.data_bytes_read += rp2 - rp1;
num_samples = dfheader.num_samples;
if data.len() != num_samples as usize {
if (data.len() as i64 - num_samples as i64).abs() < 4 {
// TODO get always one event less than num_samples tells us.
//warn!("small deviation {} vs {}", data.len(), num_samples);
} else {
return Err(Error::with_msg_no_trace(format!(
"event count mismatch {} vs {}",
data.len(),
num_samples
)));
}
}
}
} else {
self.dfnotfound.insert(dref.file_name().into(), true);
num_samples = 0;
};
}
} else {
acc = 6;
num_samples = 0;
}
let jsval = serde_json::to_value((
dp.0,
dp.0 as i64 - self.last_dp as i64,
@@ -213,15 +144,20 @@ impl BlockrefStream {
dref.data_header_pos.0,
dref.data_header_pos.0 as i64 - self.last_dp2 as i64,
dref.next().0,
acc,
num_samples,
))?;
self.last_dp = dp.0;
self.last_dp2 = dref.data_header_pos.0;
if rec.end.ns > self.range.end {
debug!("Have block end beyond range, stop");
self.steps = Done;
}
let bref = Blockref { dref, dpath };
trace!("emit {:?} Record range: {:?} TO {:?}", bref, rec.beg, rec.end);
BlockrefItem::Blockref(bref, jsval)
} else {
panic!();
error!("not a Dataref target");
self.steps = Done;
BlockrefItem::JsVal(JsVal::String(format!("not a Dataref target")))
}
} else {
debug!(
@@ -252,7 +188,47 @@ impl UnfoldExec for BlockrefStream {
pub fn blockref_stream(
channel: Channel,
range: NanoRange,
conf: ChannelArchiver,
expand: bool,
dbconf: Database,
) -> impl Stream<Item = Result<BlockrefItem, Error>> {
unfold_stream(BlockrefStream::new(channel, range, conf.clone()))
unfold_stream(BlockrefStream::new(channel, range, expand, dbconf))
}
#[cfg(test)]
mod test {
use super::*;
use futures_util::StreamExt;
use netpod::timeunits::SEC;
#[test]
fn find_ref_1() -> Result<(), Error> {
let fut = async move {
let channel = Channel {
backend: "sls-archive".into(),
name: "X05DA-FE-WI1:TC1".into(),
};
use chrono::{DateTime, Utc};
let dtbeg: DateTime<Utc> = "2021-10-01T00:00:00Z".parse()?;
let dtend: DateTime<Utc> = "2021-10-10T00:00:00Z".parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let dbconf = Database {
host: "localhost".into(),
name: "testingdaq".into(),
user: "testingdaq".into(),
pass: "testingdaq".into(),
};
let mut refs = Box::pin(blockref_stream(channel, range, false, dbconf));
while let Some(item) = refs.next().await {
info!("Got ref {:?}", item);
}
Ok(())
};
taskrun::run(fut)
}
}

View File

@@ -85,6 +85,7 @@ impl Future for FutA {
}
}
#[derive(Debug)]
pub enum BlockItem {
EventsItem(EventsItem),
JsVal(JsVal),
@@ -117,7 +118,7 @@ impl<S> BlockStream<S> {
where
S: Stream<Item = Result<BlockrefItem, Error>> + Unpin,
{
debug!("new BlockStream");
debug!("new BlockStream max_reads {} {:?}", max_reads, range);
Self {
inp,
inp_done: false,
@@ -311,11 +312,11 @@ where
}
}
if ev.len() == 1 {
debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0);
debug!("See 1 event {:?}", Nanos::from_ns(ev.ts(0)));
trace!("From {} {:?} {}", item.fname, item.path, item.dpos.0);
trace!("See 1 event {:?}", Nanos::from_ns(ev.ts(0)));
} else if ev.len() > 1 {
debug!("From {} {:?} {}", item.fname, item.path, item.dpos.0);
debug!(
trace!("From {} {:?} {}", item.fname, item.path, item.dpos.0);
trace!(
"See {} events {:?} to {:?}",
ev.len(),
Nanos::from_ns(ev.ts(0)),
@@ -324,8 +325,8 @@ where
}
let mut contains_unordered = false;
for i in 0..ev.len() {
// TODO factor for performance.
let ts = ev.ts(i);
debug!("\nSEE EVENT {:?}", Nanos::from_ns(ts));
if ts < self.ts_max {
contains_unordered = true;
if true {
@@ -447,3 +448,335 @@ impl<S> Drop for BlockStream<S> {
trace!("Drop {:?}", self);
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::archeng::blockrefstream::blockref_stream;
use futures_util::StreamExt;
use items::{LogItem, RangeCompletableItem, StreamItem};
use netpod::{timeunits::SEC, Channel, Database};
use streams::rangefilter::RangeFilter;
struct EventCount {
pre: usize,
inside: usize,
post: usize,
raco: usize,
tss: Vec<u64>,
}
impl EventCount {
fn new() -> Self {
Self {
pre: 0,
inside: 0,
post: 0,
raco: 0,
tss: vec![],
}
}
}
async fn count_events(range: NanoRange, expand: bool, collect_ts: bool) -> Result<EventCount, Error> {
let channel = Channel {
backend: "sls-archive".into(),
name: "X05DA-FE-WI1:TC1".into(),
};
let dbconf = Database {
host: "localhost".into(),
name: "testingdaq".into(),
user: "testingdaq".into(),
pass: "testingdaq".into(),
};
let refs = Box::pin(blockref_stream(channel, range.clone(), expand, dbconf));
let blocks = BlockStream::new(refs, range.clone(), 1);
let events = blocks.map(|item| match item {
Ok(k) => match k {
BlockItem::EventsItem(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))),
BlockItem::JsVal(k) => Ok(StreamItem::Log(LogItem::quick(Level::TRACE, format!("{:?}", k)))),
},
Err(e) => Err(e),
});
let mut filtered = RangeFilter::new(events, range.clone(), expand);
let mut ret = EventCount::new();
while let Some(item) = filtered.next().await {
//info!("Got block {:?}", item);
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
ret.raco += 1;
}
RangeCompletableItem::Data(item) => {
let n = item.len();
for i in 0..n {
let ts = item.ts(i);
if ts < range.beg {
ret.pre += 1;
} else if ts < range.end {
ret.inside += 1;
} else {
ret.post += 1;
}
if collect_ts {
ret.tss.push(ts);
}
}
}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => {
return Err(e);
}
}
}
Ok(ret)
}
#[test]
fn read_blocks_one_event() -> Result<(), Error> {
let _ev1 = "2021-10-03T09:57:59.939651334Z";
let _ev2 = "2021-10-03T09:58:59.940910313Z";
let _ev3 = "2021-10-03T09:59:59.940112431Z";
let ev1ts = 1633255079939651334;
let ev2ts = 1633255139940910313;
// [ev1..ev2]
let range = NanoRange { beg: ev1ts, end: ev2ts };
let res = taskrun::run(count_events(range, false, true))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 1);
assert_eq!(res.post, 0);
assert_eq!(res.tss[0], ev1ts);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_one_event_expand() -> Result<(), Error> {
let ev1ts = 1633255079939651334;
let ev2ts = 1633255139940910313;
let range = NanoRange { beg: ev1ts, end: ev2ts };
let res = taskrun::run(count_events(range, true, true))?;
assert_eq!(res.pre, 1);
assert_eq!(res.inside, 1);
assert_eq!(res.post, 1);
assert_eq!(res.tss[1], ev1ts);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_two_events() -> Result<(), Error> {
let ev1ts = 1633255079939651334;
let ev2ts = 1633255139940910313;
let range = NanoRange {
beg: ev1ts,
end: ev2ts + 1,
};
let res = taskrun::run(count_events(range, false, true))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 2);
assert_eq!(res.post, 0);
assert_eq!(res.tss[0], ev1ts);
assert_eq!(res.tss[1], ev2ts);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_two_events_expand() -> Result<(), Error> {
let ev1ts = 1633255079939651334;
let ev2ts = 1633255139940910313;
let range = NanoRange {
beg: ev1ts,
end: ev2ts + 1,
};
let res = taskrun::run(count_events(range, true, true))?;
assert_eq!(res.pre, 1);
assert_eq!(res.inside, 2);
assert_eq!(res.post, 1);
assert_eq!(res.tss[1], ev1ts);
assert_eq!(res.tss[2], ev2ts);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_1() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-10-06T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, false, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 77);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_2() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-09-01T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, false, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 20328);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_3() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-08-01T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, false, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 35438);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_3_expand() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-08-01T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, true, false))?;
assert_eq!(res.pre, 1);
assert_eq!(res.inside, 35438);
assert_eq!(res.post, 1);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_4() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2020-01-01T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, false, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 71146);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_many_4_expand() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2020-01-01T00:00:00Z";
let _late = "2021-10-07T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, true, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 71146);
assert_eq!(res.post, 1);
assert_eq!(res.raco, 1);
Ok(())
}
#[test]
fn read_blocks_late() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-10-01T00:00:00Z";
let _late = "2021-12-01T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, false, false))?;
assert_eq!(res.pre, 0);
assert_eq!(res.inside, 3000);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 0);
Ok(())
}
#[test]
fn read_blocks_late_expand() -> Result<(), Error> {
use chrono::{DateTime, Utc};
let _early = "2021-10-01T00:00:00Z";
let _late = "2021-12-01T00:00:00Z";
let dtbeg: DateTime<Utc> = _early.parse()?;
let dtend: DateTime<Utc> = _late.parse()?;
fn tons(dt: &DateTime<Utc>) -> u64 {
dt.timestamp() as u64 * SEC + dt.timestamp_subsec_nanos() as u64
}
let range = NanoRange {
beg: tons(&dtbeg),
end: tons(&dtend),
};
let res = taskrun::run(count_events(range, true, false))?;
assert_eq!(res.pre, 1);
assert_eq!(res.inside, 3000);
assert_eq!(res.post, 0);
assert_eq!(res.raco, 0);
Ok(())
}
}

View File

@@ -232,6 +232,7 @@ impl Stream for ConfigStream {
let q = ChannelConfigQuery {
channel,
range: NanoRange { beg, end },
expand: true,
};
let fut = super::channel_config(&q, &conf);
let fut = tokio::time::timeout(Duration::from_millis(2000), fut);

View File

@@ -217,7 +217,7 @@ pub struct DatablockStream {
}
impl DatablockStream {
pub fn for_channel_range(
pub fn _for_channel_range(
range: NanoRange,
channel: Channel,
base_dirs: VecDeque<PathBuf>,
@@ -332,6 +332,10 @@ mod test {
#[test]
fn read_file_basic_info() -> Result<(), Error> {
// TODO redo test
if true {
panic!();
}
let fut = async {
// file begin archive_X05DA_SH/20211001/20211001: 1633039259
// 1633145759
@@ -354,7 +358,7 @@ mod test {
.map(PathBuf::from)
.collect();
let expand = false;
let datablocks = DatablockStream::for_channel_range(range.clone(), channel, base_dirs, expand, u64::MAX);
let datablocks = DatablockStream::_for_channel_range(range.clone(), channel, base_dirs, expand, u64::MAX);
let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand);
let mut stream = filtered;
while let Some(block) = stream.next().await {

View File

@@ -536,45 +536,115 @@ impl Rtree {
Ok(node)
}
pub async fn iter_range(&mut self, range: NanoRange, stats: &StatsChannel) -> Result<Option<RecordIter>, Error> {
pub async fn iter_range(
&mut self,
range: NanoRange,
expand: bool,
stats: &StatsChannel,
) -> Result<Option<RecordIter>, Error> {
debug!("iter_range search for {:?}", range);
// TODO RecordIter needs to know when to stop after range.
let ts1 = Instant::now();
let mut stack = VecDeque::new();
let mut node = self.read_node_at(self.root.clone(), stats).await?;
debug!("have root node {:?}", node);
let mut node_reads = 1;
'outer: loop {
let nr = node.records.len();
for (i, rec) in node.records.iter().enumerate() {
if rec.beg.ns > range.beg {
match &rec.target {
RecordTarget::Child(child) => {
trace!("found non-leaf match at {} / {}", i, nr);
let child = child.clone();
let nr = RtreeNodeAtRecord { node, rix: i };
node = self.read_node_at(child, stats).await?;
node_reads += 1;
stack.push_back(nr);
continue 'outer;
}
RecordTarget::Dataref(_dataref) => {
trace!("found leaf match at {} / {}", i, nr);
let nr = RtreeNodeAtRecord { node, rix: i };
stack.push_back(nr);
let ret = RecordIter {
tree: self.reopen(stats).await?,
stack,
stats: stats.clone(),
};
let stats = TreeSearchStats::new(ts1, node_reads);
trace!("iter_range done stats: {:?}", stats);
return Ok(Some(ret));
if let Some(rec0) = node.records.first() {
if rec0.beg.ns > range.beg || (expand && rec0.beg.ns == range.beg) {
debug!("Start at begin of tree");
'outer: loop {
let nrlen = node.records.len();
for (i, rec) in node.records.iter().enumerate() {
if true {
match &rec.target {
RecordTarget::Child(child) => {
trace!("found non-leaf match at {} / {}", i, nrlen);
let child = child.clone();
let nr = RtreeNodeAtRecord { node, rix: i };
node = self.read_node_at(child, stats).await?;
node_reads += 1;
stack.push_back(nr);
continue 'outer;
}
RecordTarget::Dataref(_dataref) => {
trace!("found leaf match at {} / {}", i, nrlen);
let nr = RtreeNodeAtRecord { node, rix: i };
stack.push_back(nr);
let ret = RecordIter {
tree: self.reopen(stats).await?,
stack,
range: range.clone(),
expand,
had_post: false,
done: false,
stats: stats.clone(),
};
let stats = TreeSearchStats::new(ts1, node_reads);
trace!("iter_range done stats: {:?}", stats);
return Ok(Some(ret));
}
}
}
}
//let stats = TreeSearchStats::new(ts1, node_reads);
//trace!("loop did not find something, iter_range done stats: {:?}", stats);
//return Ok(None);
return Err(Error::with_msg_no_trace("can not find the first leaf"));
}
} else {
debug!("Search within the tree");
'outer2: loop {
let nr = node.records.len();
for (i, rec) in node.records.iter().enumerate().rev() {
if rec.beg.ns < range.beg || (!expand && rec.beg.ns == range.beg) {
match &rec.target {
RecordTarget::Child(child) => {
trace!("found non-leaf match at {} / {}", i, nr);
let child = child.clone();
let nr = RtreeNodeAtRecord { node, rix: i };
node = self.read_node_at(child, stats).await?;
node_reads += 1;
stack.push_back(nr);
continue 'outer2;
}
RecordTarget::Dataref(_dataref) => {
trace!("found leaf match at {} / {}", i, nr);
let nr = RtreeNodeAtRecord { node, rix: i };
stack.push_back(nr);
let ret = RecordIter {
tree: self.reopen(stats).await?,
stack,
range: range.clone(),
expand,
had_post: false,
done: false,
stats: stats.clone(),
};
let stats = TreeSearchStats::new(ts1, node_reads);
trace!("iter_range done stats: {:?}", stats);
return Ok(Some(ret));
}
}
}
}
//let stats = TreeSearchStats::new(ts1, node_reads);
//trace!("loop did not find something, iter_range done stats: {:?}", stats);
//return Ok(None);
return Err(Error::with_msg_no_trace("expected to find a leaf"));
}
}
let stats = TreeSearchStats::new(ts1, node_reads);
trace!("iter_range done stats: {:?}", stats);
return Ok(None);
} else {
debug!("no records at all");
let ret = RecordIter {
tree: self.reopen(stats).await?,
stack,
range: range.clone(),
expand,
had_post: false,
done: false,
stats: stats.clone(),
};
return Ok(Some(ret));
}
}
@@ -595,11 +665,21 @@ impl Rtree {
pub struct RecordIter {
tree: Rtree,
stack: VecDeque<RtreeNodeAtRecord>,
range: NanoRange,
expand: bool,
had_post: bool,
done: bool,
stats: StatsChannel,
}
impl RecordIter {
pub async fn next(&mut self) -> Result<Option<RtreeRecord>, Error> {
if self.done {
return Ok(None);
}
if self.had_post {
self.done = true;
}
match self.stack.back_mut() {
Some(nr) => {
assert_eq!(nr.node.is_leaf, true);
@@ -607,6 +687,13 @@ impl RecordIter {
let ret = ret.clone();
if nr.advance()? {
//trace!("still more records here {} / {}", nr.rix, nr.node.records.len());
let beg2 = nr.rec().unwrap().beg.ns;
if beg2 >= self.range.end {
self.had_post = true;
if !self.expand {
self.done = true;
}
}
} else {
loop {
if self.stack.pop_back().is_none() {
@@ -636,6 +723,13 @@ impl RecordIter {
RecordTarget::Dataref(_) => {
trace!("loop B is-leaf");
// done, we've positioned the next result.
let beg2 = n2.rec().unwrap().beg.ns;
if beg2 >= self.range.end {
self.had_post = true;
if !self.expand {
self.done = true;
}
}
break;
}
}
@@ -697,6 +791,7 @@ impl TreeSearchStats {
}
}
// TODO get rid of this in favor of RecordIter.
pub async fn search_record(
file: &mut File,
rtree_m: usize,
@@ -732,6 +827,7 @@ pub async fn search_record(
}
}
// TODO get rid of this in favor of RecordIter.
pub async fn search_record_expand_try(
file: &mut File,
rtree_m: usize,
@@ -770,6 +866,7 @@ pub async fn search_record_expand_try(
}
}
// TODO get rid of this in favor of RecordIter.
pub async fn search_record_expand(
file: &mut File,
rtree_m: usize,
@@ -1111,7 +1208,7 @@ mod test {
.await?
.ok_or_else(|| Error::with_msg("no tree found for channel"))?;
let mut iter = tree
.iter_range(range, stats)
.iter_range(range, false, stats)
.await?
.ok_or_else(|| Error::with_msg("could not position iterator"))?;
let mut i1 = 0;
@@ -1148,7 +1245,7 @@ mod test {
.await?
.ok_or_else(|| Error::with_msg("no tree found for channel"))?;
let mut iter = tree
.iter_range(range, stats)
.iter_range(range, false, stats)
.await?
.ok_or_else(|| Error::with_msg("could not position iterator"))?;
let mut i1 = 0;

View File

@@ -23,12 +23,18 @@ pub async fn make_event_pipe(
let q = ChannelConfigQuery {
channel: evq.channel.clone(),
range: evq.range.clone(),
expand: evq.agg_kind.need_expand(),
};
crate::archeng::channel_config_from_db(&q, &conf).await?
};
debug!("Channel config: {:?}", channel_config);
use crate::archeng::blockstream::BlockItem;
let refs = blockref_stream(evq.channel.clone(), evq.range.clone(), conf.clone());
let refs = blockref_stream(
evq.channel.clone(),
evq.range.clone(),
evq.agg_kind.need_expand(),
conf.database.clone(),
);
let blocks = BlockStream::new(Box::pin(refs), evq.range.clone(), 1);
let blocks = blocks.map(|k| match k {
Ok(item) => match item {
@@ -84,10 +90,13 @@ pub async fn make_event_pipe(
Ok(Box::pin(ret))
}
pub async fn make_event_pipe1(
pub async fn _make_event_pipe1(
evq: &RawEventsQuery,
conf: ChannelArchiver,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// TODO unused
err::todo();
let range = evq.range.clone();
let channel = evq.channel.clone();
let expand = evq.agg_kind.need_expand();
@@ -99,11 +108,12 @@ pub async fn make_event_pipe1(
let q = ChannelConfigQuery {
channel: channel.clone(),
range: range.clone(),
expand: false,
};
crate::archeng::channel_config_from_db(&q, &conf).await?
};
let data = DatablockStream::for_channel_range(
let data = DatablockStream::_for_channel_range(
range.clone(),
channel,
conf.data_base_paths.clone().into(),

View File

@@ -82,6 +82,7 @@ impl FrameMaker {
#[allow(unused_macros)]
macro_rules! events_item_to_sitemty {
($ei:expr, $t1:ident, $t2:ident, $t3:ident) => {{
let combo = format!("t1 {} t2 {} t3 {}", stringify!($t1), stringify!($t2), stringify!($t3));
let ret = match $ei {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
@@ -95,13 +96,22 @@ macro_rules! events_item_to_sitemty {
//
match h {
$t2::$t3(h) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))),
_ => panic!(),
_ => {
warn!("case AA {}", combo);
panic!()
}
}
}
_ => panic!(),
_ => {
warn!("case BB {}", combo);
panic!()
}
}
}
_ => panic!(),
_ => {
warn!("case CC {}", combo);
panic!()
}
}
}
RangeCompletableItem::RangeComplete => {
@@ -120,6 +130,16 @@ macro_rules! events_item_to_sitemty {
macro_rules! arm2 {
($item:expr, $t1:ident, $t2:ident, $t3:ident, $t4:ident, $t5:ident, $sty1:ident, $sty2:ident) => {{
type T1 = $t1<$sty1>;
let combo = format!(
"t1 {} t2 {} t3 {} t4 {} t5 {} sty1 {} sty2 {}",
stringify!($t1),
stringify!($t2),
stringify!($t3),
stringify!($t4),
stringify!($t5),
stringify!($sty1),
stringify!($sty2)
);
let ret: Sitemty<T1> = match $item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
@@ -133,13 +153,18 @@ macro_rules! arm2 {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
_ => panic!(),
_ => {
warn!("unclear what to do A {}", combo);
err::todoval()
}
},
_ => panic!(),
_ => {
warn!("unclear what to do B {}", combo);
err::todoval()
}
},
_ => {
error!("unexpected arm2 case");
error!("unexpected arm2 case {}", combo);
err::todoval()
}
},
@@ -157,7 +182,10 @@ macro_rules! arm1 {
($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{
match $shape {
Shape::Scalar => match $ak {
AggKind::EventBlobs => panic!(),
AggKind::EventBlobs => {
warn!("arm1 unhandled EventBlobs");
panic!()
}
AggKind::Plain => arm2!(
$item,
EventValues,
@@ -168,6 +196,16 @@ macro_rules! arm1 {
$sty1,
$sty2
),
AggKind::TimeWeightedScalar => arm2!(
$item,
EventValues,
XBinnedEvents,
XBinnedEvents,
Scalar,
ScalarPlainEvents,
$sty1,
$sty2
),
AggKind::DimXBins1 => arm2!(
$item,
EventValues,
@@ -178,7 +216,6 @@ macro_rules! arm1 {
$sty1,
$sty2
),
AggKind::TimeWeightedScalar => panic!(),
AggKind::DimXBinsN(_) => arm2!(
$item,
EventValues,
@@ -191,7 +228,10 @@ macro_rules! arm1 {
),
},
Shape::Wave(_) => match $ak {
AggKind::EventBlobs => panic!(),
AggKind::EventBlobs => {
warn!("arm1 unhandled EventBlobs");
panic!()
}
AggKind::Plain => arm2!(
$item,
WaveEvents,
@@ -202,7 +242,16 @@ macro_rules! arm1 {
$sty1,
$sty2
),
AggKind::TimeWeightedScalar => panic!(),
AggKind::TimeWeightedScalar => arm2!(
$item,
XBinnedScalarEvents,
XBinnedEvents,
XBinnedEvents,
SingleBinWave,
SingleBinWaveEvents,
$sty1,
$sty2
),
AggKind::DimXBins1 => arm2!(
$item,
XBinnedScalarEvents,