Enable fetch of channel info and events

This commit is contained in:
Dominik Werder
2021-10-22 17:45:13 +02:00
parent dafe0a6e3b
commit 4d7ec67010
15 changed files with 363 additions and 85 deletions

View File

@@ -1,5 +1,6 @@
pub mod datablockstream;
pub mod datastream;
pub mod pipe;
use crate::{EventsItem, PlainEvents, ScalarPlainEvents};
use async_channel::{Receiver, Sender};
@@ -7,8 +8,9 @@ use err::Error;
use futures_core::Future;
use futures_util::StreamExt;
use items::eventvalues::EventValues;
use items::{RangeCompletableItem, StreamItem};
use netpod::timeunits::SEC;
use netpod::{log::*, ChannelArchiver, DataHeaderPos, FilePos, Nanos};
use netpod::{log::*, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DataHeaderPos, FilePos, Nanos};
use std::convert::TryInto;
use std::io::{self, SeekFrom};
use std::path::PathBuf;
@@ -48,7 +50,9 @@ pub async fn open_read(path: PathBuf) -> io::Result<File> {
let res = OpenOptions::new().read(true).open(path).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
//info!("timed open_read dt: {:.3} ms", dt);
if false {
info!("timed open_read dt: {:.3} ms", dt);
}
res
}
@@ -57,7 +61,9 @@ async fn seek(file: &mut File, pos: SeekFrom) -> io::Result<u64> {
let res = file.seek(pos).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
//info!("timed seek dt: {:.3} ms", dt);
if false {
info!("timed seek dt: {:.3} ms", dt);
}
res
}
@@ -66,7 +72,9 @@ async fn read(file: &mut File, buf: &mut [u8]) -> io::Result<usize> {
let res = file.read(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
//info!("timed read dt: {:.3} ms res: {:?}", dt, res);
if false {
info!("timed read dt: {:.3} ms res: {:?}", dt, res);
}
res
}
@@ -75,7 +83,9 @@ async fn read_exact(file: &mut File, buf: &mut [u8]) -> io::Result<usize> {
let res = file.read_exact(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
//info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res);
if false {
info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res);
}
res
}
@@ -232,7 +242,7 @@ fn read_string(buf: &[u8]) -> Result<String, Error> {
.iter()
.map(|k| *k)
.enumerate()
.take_while(|&(i, k)| k != 0)
.take_while(|&(_, k)| k != 0)
.last()
.map(|(i, _)| i);
let ret = match imax {
@@ -481,12 +491,12 @@ pub async fn search_record(
//info!("looking at record i {}", i);
if rec.ts2.ns > beg.ns {
if node.is_leaf {
info!("found leaf match at {} / {}", i, nr);
trace!("found leaf match at {} / {}", i, nr);
let ret = RTreeNodeAtRecord { node, rix: i };
let stats = TreeSearchStats::new(ts1, node_reads);
return Ok((Some(ret), stats));
} else {
info!("found non-leaf match at {} / {}", i, nr);
trace!("found non-leaf match at {} / {}", i, nr);
let pos = FilePos { pos: rec.child_or_id };
node = read_rtree_node(file, pos, rtree_m).await?;
node_reads += 1;
@@ -668,9 +678,12 @@ pub async fn channel_list(index_path: PathBuf) -> Result<Vec<String>, Error> {
} else if basics.version == 3 {
&hver3
} else {
panic!();
return Err(Error::with_msg_no_trace(format!(
"unexpected version {}",
basics.version
)));
};
for (i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() {
for (_i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() {
if name_hash_entry.named_hash_channel_entry_pos != 0 {
let pos = FilePos {
pos: name_hash_entry.named_hash_channel_entry_pos,
@@ -684,7 +697,8 @@ pub async fn channel_list(index_path: PathBuf) -> Result<Vec<String>, Error> {
Ok(ret)
}
async fn datarange_stream_fill(channel_name: &str, tx: Sender<Datarange>) {
#[allow(dead_code)]
async fn datarange_stream_fill(_channel_name: &str, _tx: Sender<Datarange>) {
// Search the first relevant leaf node.
// Pipe all ranges from there, and continue with nodes.
// Issue: can not stop because I don't look into the files.
@@ -694,8 +708,8 @@ async fn datarange_stream_fill(channel_name: &str, tx: Sender<Datarange>) {
// Should contain enough information to allow one to open and position a relevant datafile.
pub struct Datarange {}
pub fn datarange_stream(channel_name: &str) -> Result<Receiver<Datarange>, Error> {
let (tx, rx) = async_channel::bounded(4);
pub fn datarange_stream(_channel_name: &str) -> Result<Receiver<Datarange>, Error> {
let (_tx, rx) = async_channel::bounded(4);
let task = async {};
taskrun::spawn(task);
Ok(rx)
@@ -761,6 +775,7 @@ impl DbrType {
Ok(res)
}
#[allow(dead_code)]
fn byte_len(&self) -> usize {
use DbrType::*;
match self {
@@ -999,6 +1014,73 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver<Result<String, Erro
rx
}
pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> Result<ChannelConfigResponse, Error> {
let mut type_info = None;
let mut stream = datablockstream::DatablockStream::for_channel_range(
q.range.clone(),
q.channel.clone(),
conf.data_base_paths.clone().into(),
true,
);
while let Some(item) = stream.next().await {
match item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => (),
RangeCompletableItem::Data(k) => {
type_info = Some(k.type_info());
break;
}
},
StreamItem::Log(_) => (),
StreamItem::Stats(_) => (),
},
Err(e) => {
error!("{}", e);
()
}
}
}
if type_info.is_none() {
let mut stream = datablockstream::DatablockStream::for_channel_range(
q.range.clone(),
q.channel.clone(),
conf.data_base_paths.clone().into(),
false,
);
while let Some(item) = stream.next().await {
match item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => (),
RangeCompletableItem::Data(k) => {
type_info = Some(k.type_info());
break;
}
},
StreamItem::Log(_) => (),
StreamItem::Stats(_) => (),
},
Err(e) => {
error!("{}", e);
()
}
}
}
}
if let Some(type_info) = type_info {
let ret = ChannelConfigResponse {
channel: q.channel.clone(),
scalar_type: type_info.0,
byte_order: None,
shape: type_info.1,
};
Ok(ret)
} else {
Err(Error::with_msg_no_trace("can not get channel type info"))
}
}
#[cfg(test)]
mod test {
// TODO move RangeFilter to a different crate (items?)

View File

@@ -70,36 +70,40 @@ async fn datablock_stream_inner(
search_record(&mut index_file, basics.rtree_m, basics.rtree_start_pos, search_ts).await?;
if let Some(nrec) = res {
let rec = nrec.rec();
info!("found record: {:?}", rec);
trace!("found record: {:?}", rec);
let pos = FilePos { pos: rec.child_or_id };
// TODO rename Datablock? → IndexNodeDatablock
info!("\n\nREAD Datablock FROM {:?}\n", pos);
trace!("READ Datablock FROM {:?}\n", pos);
let datablock = read_index_datablockref(&mut index_file, pos).await?;
info!("\nDatablock: {:?}\n", datablock);
trace!("Datablock: {:?}\n", datablock);
let data_path = index_path.parent().unwrap().join(datablock.file_name());
if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos {
warn!("SKIP BECAUSE ITS THE SAME BLOCK");
debug!("skipping because it is the same block");
} else {
info!("try to open data_path: {:?}", data_path);
let mut data_file = open_read(data_path.clone()).await?;
let datafile_header =
read_datafile_header(&mut data_file, datablock.data_header_pos()).await?;
info!("datafile_header -------------- HEADER\n{:?}", datafile_header);
let events = read_data_1(&mut data_file, &datafile_header).await?;
info!("Was able to read data: {} events", events.len());
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events)));
tx.send(item).await?;
trace!("try to open data_path: {:?}", data_path);
match open_read(data_path.clone()).await {
Ok(mut data_file) => {
let datafile_header =
read_datafile_header(&mut data_file, datablock.data_header_pos()).await?;
trace!("datafile_header -------------- HEADER\n{:?}", datafile_header);
let events = read_data_1(&mut data_file, &datafile_header).await?;
trace!("Was able to read data: {} events", events.len());
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events)));
tx.send(item).await?;
}
Err(e) => {
// That's fine. The index mentions lots of datafiles which got purged already.
trace!("can not find file mentioned in index: {:?} {}", data_path, e);
}
};
}
if datablock.next != 0 {
error!("datablock.next != 0: {:?}", datablock);
warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock);
}
last_data_file_path = data_path;
last_data_file_pos = datablock.data_header_pos();
if expand {
err::todo()
} else {
search_ts.ns = rec.ts2.ns;
};
// TODO anything special to do in expand mode?
search_ts.ns = rec.ts2.ns;
} else {
warn!("nothing found, break");
break;
@@ -107,7 +111,7 @@ async fn datablock_stream_inner(
}
}
} else {
info!("can not find index file at {:?}", index_path);
warn!("can not find index file at {:?}", index_path);
}
}
Ok(())
@@ -118,7 +122,7 @@ pub struct DatablockStream {
channel: Channel,
base_dirs: VecDeque<PathBuf>,
expand: bool,
fut: Pin<Box<dyn Future<Output = FR>>>,
fut: Pin<Box<dyn Future<Output = FR> + Send>>,
rx: Receiver<Sitemty<EventsItem>>,
done: bool,
complete: bool,

View File

@@ -9,7 +9,9 @@ pub struct DataStream {}
impl Stream for DataStream {
type Item = Sitemty<EventsItem>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let _ = self;
let _ = cx;
todo!()
}
}

View File

@@ -0,0 +1,38 @@
use crate::archeng::datablockstream::DatablockStream;
use crate::events::{FrameMaker, FrameMakerTrait};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::Framable;
use netpod::{query::RawEventsQuery, ChannelArchiver};
use std::pin::Pin;
use streams::rangefilter::RangeFilter;
pub async fn make_event_pipe(
evq: &RawEventsQuery,
conf: &ChannelArchiver,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// In order to extract something from the channel, need to look up first the type of the channel.
//let ci = channel_info(&evq.channel, aa).await?;
/*let mut inps = vec![];
for p1 in &aa.data_base_paths {
let p2 = p1.clone();
let p3 = make_single_event_pipe(evq, p2).await?;
inps.push(p3);
}
let sm = StorageMerge {
inprng: inps.len() - 1,
current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(),
completed_inps: vec![false; inps.len()],
inps,
};*/
let range = evq.range.clone();
let channel = evq.channel.clone();
let expand = evq.agg_kind.need_expand();
let data = DatablockStream::for_channel_range(range.clone(), channel, conf.data_base_paths.clone().into(), expand);
let filtered = RangeFilter::new(data, range, expand);
let stream = filtered;
let mut frame_maker = Box::new(FrameMaker::untyped(evq.agg_kind.clone())) as Box<dyn FrameMakerTrait>;
let ret = stream.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
}

View File

@@ -163,13 +163,13 @@ impl Stream for StorageMerge {
}
}
trait FrameMakerTrait: Send {
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable>;
pub trait FrameMakerTrait: Send {
fn make_frame(&mut self, ei: Sitemty<EventsItem>) -> Box<dyn Framable>;
}
struct FrameMaker {
scalar_type: ScalarType,
shape: Shape,
pub struct FrameMaker {
scalar_type: Option<ScalarType>,
shape: Option<Shape>,
agg_kind: AggKind,
}
@@ -181,6 +181,22 @@ impl FrameMaker {
{
err::todoval()
}
pub fn with_item_type(scalar_type: ScalarType, shape: Shape, agg_kind: AggKind) -> Self {
Self {
scalar_type: Some(scalar_type),
shape: Some(shape),
agg_kind: agg_kind,
}
}
pub fn untyped(agg_kind: AggKind) -> Self {
Self {
scalar_type: None,
shape: None,
agg_kind,
}
}
}
#[allow(unused_macros)]
@@ -334,18 +350,83 @@ macro_rules! arm1 {
}
impl FrameMakerTrait for FrameMaker {
fn make_frame(&self, item: Sitemty<EventsItem>) -> Box<dyn Framable> {
fn make_frame(&mut self, item: Sitemty<EventsItem>) -> Box<dyn Framable> {
// Take from `self` the expected inner type.
// If `ei` is not some data, then I can't dynamically determine the expected T of Sitemty.
// Therefore, I need to decide that based on given parameters.
// see also channel_info in this mod.
match self.scalar_type {
ScalarType::I8 => arm1!(item, i8, Byte, self.shape, self.agg_kind),
ScalarType::I16 => arm1!(item, i16, Short, self.shape, self.agg_kind),
ScalarType::I32 => arm1!(item, i32, Int, self.shape, self.agg_kind),
ScalarType::F32 => arm1!(item, f32, Float, self.shape, self.agg_kind),
ScalarType::F64 => arm1!(item, f64, Double, self.shape, self.agg_kind),
_ => err::todoval(),
if self.scalar_type.is_none() || self.shape.is_none() {
//let scalar_type = &ScalarType::I8;
//let shape = &Shape::Scalar;
//let agg_kind = &self.agg_kind;
let (scalar_type, shape) = match &item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => (ScalarType::I8, Shape::Scalar),
RangeCompletableItem::Data(k) => match k {
EventsItem::Plain(k) => match k {
PlainEvents::Scalar(k) => match k {
ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar),
ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar),
ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar),
ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar),
ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar),
},
PlainEvents::Wave(k) => match k {
WavePlainEvents::Byte(k) => (ScalarType::I8, Shape::Wave(k.vals[0].len() as u32)),
WavePlainEvents::Short(k) => (ScalarType::I16, Shape::Wave(k.vals[0].len() as u32)),
WavePlainEvents::Int(k) => (ScalarType::I32, Shape::Wave(k.vals[0].len() as u32)),
WavePlainEvents::Float(k) => (ScalarType::F32, Shape::Wave(k.vals[0].len() as u32)),
WavePlainEvents::Double(k) => {
(ScalarType::F64, Shape::Wave(k.vals[0].len() as u32))
}
},
},
EventsItem::XBinnedEvents(k) => match k {
XBinnedEvents::Scalar(k) => match k {
ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar),
ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar),
ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar),
ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar),
ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar),
},
XBinnedEvents::SingleBinWave(k) => match k {
SingleBinWaveEvents::Byte(_) => todo!(),
SingleBinWaveEvents::Short(_) => todo!(),
SingleBinWaveEvents::Int(_) => todo!(),
SingleBinWaveEvents::Float(_) => todo!(),
SingleBinWaveEvents::Double(_) => todo!(),
},
XBinnedEvents::MultiBinWave(k) => match k {
MultiBinWaveEvents::Byte(_) => todo!(),
MultiBinWaveEvents::Short(_) => todo!(),
MultiBinWaveEvents::Int(_) => todo!(),
MultiBinWaveEvents::Float(_) => todo!(),
MultiBinWaveEvents::Double(_) => todo!(),
},
},
},
},
StreamItem::Log(_) => (ScalarType::I8, Shape::Scalar),
StreamItem::Stats(_) => (ScalarType::I8, Shape::Scalar),
},
Err(_) => (ScalarType::I8, Shape::Scalar),
};
self.scalar_type = Some(scalar_type);
self.shape = Some(shape);
}
{
let scalar_type = self.scalar_type.as_ref().unwrap();
let shape = self.shape.as_ref().unwrap();
let agg_kind = &self.agg_kind;
match scalar_type {
ScalarType::I8 => arm1!(item, i8, Byte, shape, agg_kind),
ScalarType::I16 => arm1!(item, i16, Short, shape, agg_kind),
ScalarType::I32 => arm1!(item, i32, Int, shape, agg_kind),
ScalarType::F32 => arm1!(item, f32, Float, shape, agg_kind),
ScalarType::F64 => arm1!(item, f64, Double, shape, agg_kind),
_ => err::todoval(),
}
}
}
}
@@ -367,11 +448,11 @@ pub async fn make_event_pipe(
completed_inps: vec![false; inps.len()],
inps,
};
let frame_maker = Box::new(FrameMaker {
scalar_type: ci.scalar_type.clone(),
shape: ci.shape.clone(),
agg_kind: evq.agg_kind.clone(),
}) as Box<dyn FrameMakerTrait>;
let mut frame_maker = Box::new(FrameMaker::with_item_type(
ci.scalar_type.clone(),
ci.shape.clone(),
evq.agg_kind.clone(),
)) as Box<dyn FrameMakerTrait>;
let ret = sm.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
}

View File

@@ -15,7 +15,7 @@ use items::waveevents::{WaveEvents, WaveXBinner};
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::xbinnedwaveevents::XBinnedWaveEvents;
use items::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, SitemtyFrameType, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
use netpod::{AggKind, ByteOrder, HasScalarType, HasShape, ScalarType, Shape};
#[cfg(not(feature = "devread"))]
pub use parsestub as parse;
@@ -202,6 +202,18 @@ pub enum WavePlainEvents {
Double(WaveEvents<f64>),
}
impl WavePlainEvents {
pub fn shape(&self) -> Result<Shape, Error> {
match self {
WavePlainEvents::Byte(k) => k.shape(),
WavePlainEvents::Short(k) => k.shape(),
WavePlainEvents::Int(k) => k.shape(),
WavePlainEvents::Float(k) => k.shape(),
WavePlainEvents::Double(k) => k.shape(),
}
}
}
fn _tmp1() {
let _ev = EventValues::<u8> {
tss: vec![],
@@ -250,7 +262,7 @@ impl WavePlainEvents {
fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use WavePlainEvents::*;
let shape = self.shape();
let shape = self.shape().unwrap();
match self {
Byte(k) => wagg1!(k, ak, shape, Byte),
Short(k) => wagg1!(k, ak, shape, Short),
@@ -365,14 +377,15 @@ impl WithTimestamps for WavePlainEvents {
impl HasShape for WavePlainEvents {
fn shape(&self) -> Shape {
use WavePlainEvents::*;
/*use WavePlainEvents::*;
match self {
Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Short(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Int(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
}
}*/
self.shape().unwrap()
}
}
@@ -946,8 +959,8 @@ impl HasShape for PlainEvents {
fn shape(&self) -> Shape {
use PlainEvents::*;
match self {
Scalar(h) => h.shape(),
Wave(h) => h.shape(),
Scalar(h) => HasShape::shape(h),
Wave(h) => HasShape::shape(h),
}
}
}
@@ -998,6 +1011,31 @@ impl EventsItem {
XBinnedEvents(k) => k.x_aggregate(ak),
}
}
pub fn type_info(&self) -> (ScalarType, Shape) {
match self {
EventsItem::Plain(k) => match k {
PlainEvents::Scalar(k) => match k {
ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar),
ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar),
ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar),
ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar),
ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar),
},
PlainEvents::Wave(k) => match k {
// TODO
// Inherent issue for the non-static-type backends:
// there is a chance that we can't determine the shape here.
WavePlainEvents::Byte(k) => (ScalarType::I8, k.shape().unwrap()),
WavePlainEvents::Short(k) => (ScalarType::I16, k.shape().unwrap()),
WavePlainEvents::Int(k) => (ScalarType::I32, k.shape().unwrap()),
WavePlainEvents::Float(k) => (ScalarType::F32, k.shape().unwrap()),
WavePlainEvents::Double(k) => (ScalarType::F64, k.shape().unwrap()),
},
},
EventsItem::XBinnedEvents(k) => panic!(),
}
}
}
impl WithLen for EventsItem {