diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 4d803be..5c24401 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -3,15 +3,21 @@ use crate::EventsItem; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; +use futures_util::StreamExt; use items::eventvalues::EventValues; -use items::{Framable, RangeCompletableItem, StreamItem}; +use items::{Framable, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::log::*; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::timeunits::{DAY, SEC}; use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape}; use serde_json::Value as JsonValue; +use std::collections::VecDeque; +use std::future::Future; +use std::marker::PhantomData; use std::path::PathBuf; use std::pin::Pin; +use std::task::{Context, Poll}; use tokio::fs::{read_dir, File}; struct DataFilename { @@ -39,19 +45,155 @@ fn parse_data_filename(s: &str) -> Result { Ok(ret) } +type SMI = Box; +type REFF = Pin>>>; + +struct StorageMerge { + inps: Vec> + Send>>>, + completed_inps: Vec, + current_inp_item: Vec>, +} + +impl StorageMerge { + fn refill_if_needed(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(Pin<&mut Self>, bool), Error> { + use Poll::*; + let mut is_pending = false; + for i in 0..self.inps.len() { + if self.current_inp_item[i].is_none() && self.completed_inps[i] == false { + match self.inps[i].poll_next_unpin(cx) { + Ready(j) => { + // + match j { + Some(j) => match j { + Ok(j) => match j { + StreamItem::DataItem(j) => match j { + RangeCompletableItem::Data(j) => { + self.current_inp_item[i] = Some(j); + } + RangeCompletableItem::RangeComplete => {} + }, + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + }, + Err(e) => { + self.completed_inps[i] = true; + error!("inp err {:?}", e); + } + }, + None => { + // + self.completed_inps[i] = true; + } + } + } + Pending => { + is_pending = true; + } + } + } + } + Ok((self, is_pending)) + } + + fn decide_next_item(&mut self) -> Result>, Error> { + // TODO + // Keep index of how low priority is allowed. Start with all of the allowed. + // For all inputs, keep one event batch in tmp if available, or mark as exhausted. + let mut cursrc = self.inps.len() - 1; + + err::todoval() + } +} + +impl Stream for StorageMerge { + type Item = Sitemty; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let (mut self2, is_pending) = self.refill_if_needed(cx).unwrap(); + if is_pending { + Pending + } else { + match self2.decide_next_item() { + Ok(j) => Ready(j), + Err(e) => { + error!("impl Stream for StorageMerge {:?}", e); + panic!() + } + } + } + } +} + +trait FrameMakerTrait: Send { + fn make_frame(&self, ei: Sitemty) -> Box; +} + +struct FrameMaker { + _m1: PhantomData, +} + +impl FrameMakerTrait for FrameMaker +where + NTY: Send, +{ + fn make_frame(&self, ei: Sitemty) -> Box { + match ei { + Ok(j) => match j { + StreamItem::DataItem(j) => match j { + RangeCompletableItem::Data(j) => match j { + EventsItem::ScalarDouble(j) => { + let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(j))); + Box::new(b) + } + _ => panic!(), + }, + _ => panic!(), + }, + _ => panic!(), + }, + _ => panic!(), + } + } +} + pub async fn make_event_pipe( evq: &RawEventsQuery, aa: &ArchiverAppliance, ) -> Result> + Send>>, Error> { + let _channel_info = channel_info(&evq.channel, aa).await?; + let mut inps = vec![]; + for p1 in &aa.data_base_paths { + let p2 = p1.clone(); + let p3 = make_single_event_pipe(evq, p2).await?; + inps.push(p3); + } + let sm = StorageMerge { + current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(), + completed_inps: vec![false; inps.len()], + inps, + }; + let frame_maker = if true { + Box::new(FrameMaker:: { _m1: PhantomData }) as Box + } else { + Box::new(FrameMaker:: { _m1: PhantomData }) + }; + let ret = sm.map(move |j| frame_maker.make_frame(j)); + Ok(Box::pin(ret)) + //err::todoval() +} + +pub async fn make_single_event_pipe( + evq: &RawEventsQuery, + base_path: PathBuf, +) -> Result> + Send>>, Error> { info!("make_event_pipe {:?}", evq); let evq = evq.clone(); - let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, aa)?; - let channel_info = channel_info(&evq.channel, aa).await?; + let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?; //let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32); let (tx, rx) = async_channel::bounded(16); let block1 = async move { trace!("++++++++++++++++++++++++++++"); - info!("++++++++++++++++++++++++++++"); info!("start read of {:?}", dir); // TODO first collect all matching filenames, then sort, then open files. @@ -75,38 +217,10 @@ pub async fn make_event_pipe( info!("✓ read header {:?}", pbr.payload_type()); loop { match pbr.read_msg().await { - Ok(ev) => { - // + Ok(ei) => { info!("read msg from file"); - match ev { - EventsItem::ScalarDouble(h) => { - // - let (x, y) = h - .tss - .into_iter() - .zip(h.values.into_iter()) - .filter_map(|(j, k)| { - if j < evq.range.beg || j >= evq.range.end { - None - } else { - Some((j, k)) - } - }) - .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { - a.push(j); - b.push(k); - (a, b) - }); - let b = EventValues { tss: x, values: y }; - let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b))); - tx.send(Box::new(b) as Box).await?; - } - _ => { - // - error!("case not covered"); - return Err(Error::with_msg_no_trace("todo")); - } - } + let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei))); + tx.send(g).await?; } Err(e) => { error!("error while reading msg {:?}", e); @@ -138,16 +252,47 @@ pub async fn make_event_pipe( Ok(Box::pin(rx)) } +fn events_item_to_framable(ei: EventsItem) -> Result, Error> { + match ei { + EventsItem::ScalarDouble(h) => { + /*let (x, y) = h + .tss + .into_iter() + .zip(h.values.into_iter()) + .filter_map(|(j, k)| { + if j < evq.range.beg || j >= evq.range.end { + None + } else { + Some((j, k)) + } + }) + .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { + a.push(j); + b.push(k); + (a, b) + }); + let b = EventValues { tss: x, values: y };*/ + let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))); + let ret = Box::new(b); + Ok(ret) + } + _ => { + error!("case not covered"); + Err(Error::with_msg_no_trace("todo")) + } + } +} + struct DirAndPrefix { dir: PathBuf, prefix: String, } -fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Result { +fn directory_for_channel_files(channel: &Channel, base_path: PathBuf) -> Result { // SARUN11/CVME/DBLM546/IOC_CPU_LOAD // SARUN11-CVME-DBLM546:IOC_CPU_LOAD let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect(); - let path = aa.data_base_path.clone(); + let path = base_path; let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x)); let ret = DirAndPrefix { dir: path, @@ -160,7 +305,8 @@ fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Res } pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { - let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa)?; + let DirAndPrefix { dir, prefix } = + directory_for_channel_files(channel, aa.data_base_paths.first().unwrap().clone())?; let mut msgs = vec![]; msgs.push(format!("path: {}", dir.to_string_lossy())); let mut scalar_type = None; diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 221f31b..e0a78a9 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -340,7 +340,7 @@ pub async fn scan_files_inner( let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; let mut paths = VecDeque::new(); paths.push_back( - aa.data_base_path.join( + aa.data_base_paths.first().unwrap().join( pairs .get("subpath") .ok_or_else(|| Error::with_msg("subpatch not given"))?, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7b018db..c203c33 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -114,7 +114,7 @@ impl ScalarType { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ArchiverAppliance { - pub data_base_path: PathBuf, + pub data_base_paths: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)]