WIP interface with existing code

This commit is contained in:
Dominik Werder
2021-07-12 13:19:12 +02:00
parent 35f3f9249e
commit 9509b43848
3 changed files with 186 additions and 40 deletions

View File

@@ -3,15 +3,21 @@ use crate::EventsItem;
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventvalues::EventValues;
use items::{Framable, RangeCompletableItem, StreamItem};
use items::{Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::timeunits::{DAY, SEC};
use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape};
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::future::Future;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{read_dir, File};
struct DataFilename {
@@ -39,19 +45,155 @@ fn parse_data_filename(s: &str) -> Result<DataFilename, Error> {
Ok(ret)
}
type SMI = Box<dyn Framable>;
type REFF = Pin<Box<dyn Future<Output = Option<SMI>>>>;
struct StorageMerge {
inps: Vec<Pin<Box<dyn Stream<Item = Sitemty<EventsItem>> + Send>>>,
completed_inps: Vec<bool>,
current_inp_item: Vec<Option<EventsItem>>,
}
impl StorageMerge {
fn refill_if_needed(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(Pin<&mut Self>, bool), Error> {
use Poll::*;
let mut is_pending = false;
for i in 0..self.inps.len() {
if self.current_inp_item[i].is_none() && self.completed_inps[i] == false {
match self.inps[i].poll_next_unpin(cx) {
Ready(j) => {
//
match j {
Some(j) => match j {
Ok(j) => match j {
StreamItem::DataItem(j) => match j {
RangeCompletableItem::Data(j) => {
self.current_inp_item[i] = Some(j);
}
RangeCompletableItem::RangeComplete => {}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => {
self.completed_inps[i] = true;
error!("inp err {:?}", e);
}
},
None => {
//
self.completed_inps[i] = true;
}
}
}
Pending => {
is_pending = true;
}
}
}
}
Ok((self, is_pending))
}
fn decide_next_item(&mut self) -> Result<Option<Sitemty<EventsItem>>, Error> {
// TODO
// Keep index of how low priority is allowed. Start with all of the allowed.
// For all inputs, keep one event batch in tmp if available, or mark as exhausted.
let mut cursrc = self.inps.len() - 1;
err::todoval()
}
}
impl Stream for StorageMerge {
type Item = Sitemty<EventsItem>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let (mut self2, is_pending) = self.refill_if_needed(cx).unwrap();
if is_pending {
Pending
} else {
match self2.decide_next_item() {
Ok(j) => Ready(j),
Err(e) => {
error!("impl Stream for StorageMerge {:?}", e);
panic!()
}
}
}
}
}
trait FrameMakerTrait: Send {
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable>;
}
struct FrameMaker<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> FrameMakerTrait for FrameMaker<NTY>
where
NTY: Send,
{
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable> {
match ei {
Ok(j) => match j {
StreamItem::DataItem(j) => match j {
RangeCompletableItem::Data(j) => match j {
EventsItem::ScalarDouble(j) => {
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(j)));
Box::new(b)
}
_ => panic!(),
},
_ => panic!(),
},
_ => panic!(),
},
_ => panic!(),
}
}
}
pub async fn make_event_pipe(
evq: &RawEventsQuery,
aa: &ArchiverAppliance,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
let _channel_info = channel_info(&evq.channel, aa).await?;
let mut inps = vec![];
for p1 in &aa.data_base_paths {
let p2 = p1.clone();
let p3 = make_single_event_pipe(evq, p2).await?;
inps.push(p3);
}
let sm = StorageMerge {
current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(),
completed_inps: vec![false; inps.len()],
inps,
};
let frame_maker = if true {
Box::new(FrameMaker::<f64> { _m1: PhantomData }) as Box<dyn FrameMakerTrait>
} else {
Box::new(FrameMaker::<f32> { _m1: PhantomData })
};
let ret = sm.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
//err::todoval()
}
pub async fn make_single_event_pipe(
evq: &RawEventsQuery,
base_path: PathBuf,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventsItem>> + Send>>, Error> {
info!("make_event_pipe {:?}", evq);
let evq = evq.clone();
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, aa)?;
let channel_info = channel_info(&evq.channel, aa).await?;
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?;
//let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32);
let (tx, rx) = async_channel::bounded(16);
let block1 = async move {
trace!("++++++++++++++++++++++++++++");
info!("++++++++++++++++++++++++++++");
info!("start read of {:?}", dir);
// TODO first collect all matching filenames, then sort, then open files.
@@ -75,38 +217,10 @@ pub async fn make_event_pipe(
info!("✓ read header {:?}", pbr.payload_type());
loop {
match pbr.read_msg().await {
Ok(ev) => {
//
Ok(ei) => {
info!("read msg from file");
match ev {
EventsItem::ScalarDouble(h) => {
//
let (x, y) = h
.tss
.into_iter()
.zip(h.values.into_iter())
.filter_map(|(j, k)| {
if j < evq.range.beg || j >= evq.range.end {
None
} else {
Some((j, k))
}
})
.fold((vec![], vec![]), |(mut a, mut b), (j, k)| {
a.push(j);
b.push(k);
(a, b)
});
let b = EventValues { tss: x, values: y };
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b)));
tx.send(Box::new(b) as Box<dyn Framable>).await?;
}
_ => {
//
error!("case not covered");
return Err(Error::with_msg_no_trace("todo"));
}
}
let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei)));
tx.send(g).await?;
}
Err(e) => {
error!("error while reading msg {:?}", e);
@@ -138,16 +252,47 @@ pub async fn make_event_pipe(
Ok(Box::pin(rx))
}
fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, Error> {
match ei {
EventsItem::ScalarDouble(h) => {
/*let (x, y) = h
.tss
.into_iter()
.zip(h.values.into_iter())
.filter_map(|(j, k)| {
if j < evq.range.beg || j >= evq.range.end {
None
} else {
Some((j, k))
}
})
.fold((vec![], vec![]), |(mut a, mut b), (j, k)| {
a.push(j);
b.push(k);
(a, b)
});
let b = EventValues { tss: x, values: y };*/
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(h)));
let ret = Box::new(b);
Ok(ret)
}
_ => {
error!("case not covered");
Err(Error::with_msg_no_trace("todo"))
}
}
}
struct DirAndPrefix {
dir: PathBuf,
prefix: String,
}
fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Result<DirAndPrefix, Error> {
fn directory_for_channel_files(channel: &Channel, base_path: PathBuf) -> Result<DirAndPrefix, Error> {
// SARUN11/CVME/DBLM546/IOC_CPU_LOAD
// SARUN11-CVME-DBLM546:IOC_CPU_LOAD
let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect();
let path = aa.data_base_path.clone();
let path = base_path;
let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x));
let ret = DirAndPrefix {
dir: path,
@@ -160,7 +305,8 @@ fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Res
}
pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<ChannelInfo, Error> {
let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa)?;
let DirAndPrefix { dir, prefix } =
directory_for_channel_files(channel, aa.data_base_paths.first().unwrap().clone())?;
let mut msgs = vec![];
msgs.push(format!("path: {}", dir.to_string_lossy()));
let mut scalar_type = None;

View File

@@ -340,7 +340,7 @@ pub async fn scan_files_inner(
let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?;
let mut paths = VecDeque::new();
paths.push_back(
aa.data_base_path.join(
aa.data_base_paths.first().unwrap().join(
pairs
.get("subpath")
.ok_or_else(|| Error::with_msg("subpatch not given"))?,

View File

@@ -114,7 +114,7 @@ impl ScalarType {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ArchiverAppliance {
pub data_base_path: PathBuf,
pub data_base_paths: Vec<PathBuf>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]