From d1401bffd59118da3dd51e6900acc80b825871d5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 19 Jul 2021 22:33:39 +0200 Subject: [PATCH] More variants for archive pb format --- archapp/src/events.rs | 82 ++++++++++++++++------- archapp/src/lib.rs | 75 +++++++++++++++++++-- archapp/src/parse.rs | 133 ++++++++++++++++++++++++++----------- archapp_wrap/src/lib.rs | 4 +- archapp_xc/src/lib.rs | 2 +- disk/src/agg/eventbatch.rs | 2 - httpret/src/lib.rs | 2 +- items/src/eventvalues.rs | 2 - 8 files changed, 227 insertions(+), 75 deletions(-) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index f93f814..01618fc 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -7,25 +7,21 @@ use futures_util::StreamExt; use items::eventvalues::EventValues; use items::{Framable, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; -use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::timeunits::{DAY, SEC}; use netpod::{ArchiverAppliance, Channel, ChannelInfo, NanoRange, ScalarType, Shape}; use serde_json::Value as JsonValue; -use std::collections::VecDeque; -use std::future::Future; -use std::marker::PhantomData; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{read_dir, File}; -struct DataFilename { +pub struct DataFilename { year: u32, month: u32, } -fn parse_data_filename(s: &str) -> Result { +pub fn parse_data_filename(s: &str) -> Result { if !s.ends_with(".pb") { return Err(Error::with_msg_no_trace("not a .pb file")); } @@ -45,13 +41,11 @@ fn parse_data_filename(s: &str) -> Result { Ok(ret) } -type SMI = Box; -type REFF = Pin>>>; - struct StorageMerge { inps: Vec> + Send>>>, completed_inps: Vec, current_inp_item: Vec>, + inprng: usize, } impl StorageMerge { @@ -96,12 +90,45 @@ impl StorageMerge { } fn decide_next_item(&mut self) -> Result>, Error> { - // TODO - // Keep index of how low priority is allowed. Start with all of the allowed. - // For all inputs, keep one event batch in tmp if available, or mark as exhausted. - let mut cursrc = self.inps.len() - 1; - - err::todoval() + let not_found = 999; + let mut i1 = self.inprng; + let mut j1 = not_found; + let mut tsmin = u64::MAX; + use items::{WithLen, WithTimestamps}; + loop { + if self.completed_inps[i1] { + } else { + match self.current_inp_item[i1].as_ref() { + None => panic!(), + Some(j) => { + if j.len() == 0 { + j1 = i1; + break; + } else { + let ts = j.ts(0); + if ts < tsmin { + tsmin = ts; + j1 = i1; + self.inprng = i1; + } else { + } + } + } + } + } + i1 -= 1; + if i1 == 0 { + break; + } + } + if j1 >= not_found { + Ok(None) + } else { + let j = self.current_inp_item[j1] + .take() + .map(|j| Ok(StreamItem::DataItem(RangeCompletableItem::Data(j)))); + Ok(j) + } } } @@ -143,7 +170,7 @@ macro_rules! events_item_to_sitemty { if let EventsItem::$var(j) = j { Ok(StreamItem::DataItem(RangeCompletableItem::Data(j))) } else { - panic!() + Err(Error::with_msg_no_trace("unexpected variant")) } } RangeCompletableItem::RangeComplete => { @@ -161,14 +188,19 @@ macro_rules! events_item_to_sitemty { impl FrameMakerTrait for FrameMaker { fn make_frame(&self, ei: Sitemty) -> Box { + // see also channel_info in this mod. match self.shape { Shape::Scalar => match self.scalar_type { + ScalarType::I8 => events_item_to_sitemty!(ei, ScalarByte), + ScalarType::I16 => events_item_to_sitemty!(ei, ScalarShort), ScalarType::I32 => events_item_to_sitemty!(ei, ScalarInt), ScalarType::F32 => events_item_to_sitemty!(ei, ScalarFloat), ScalarType::F64 => events_item_to_sitemty!(ei, ScalarDouble), _ => panic!(), }, Shape::Wave(_) => match self.scalar_type { + ScalarType::I8 => events_item_to_sitemty!(ei, WaveByte), + ScalarType::I16 => events_item_to_sitemty!(ei, WaveShort), ScalarType::I32 => events_item_to_sitemty!(ei, WaveInt), ScalarType::F32 => events_item_to_sitemty!(ei, WaveFloat), ScalarType::F64 => events_item_to_sitemty!(ei, WaveDouble), @@ -190,6 +222,7 @@ pub async fn make_event_pipe( inps.push(p3); } let sm = StorageMerge { + inprng: inps.len() - 1, current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(), completed_inps: vec![false; inps.len()], inps, @@ -271,6 +304,7 @@ pub async fn make_single_event_pipe( Ok(Box::pin(rx)) } +#[allow(unused)] fn events_item_to_framable(ei: EventsItem) -> Result, Error> { match ei { EventsItem::ScalarDouble(h) => { @@ -326,7 +360,7 @@ fn directory_for_channel_files(channel: &Channel, base_path: PathBuf) -> Result< pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { let DirAndPrefix { dir, prefix } = - directory_for_channel_files(channel, aa.data_base_paths.first().unwrap().clone())?; + directory_for_channel_files(channel, aa.data_base_paths.last().unwrap().clone())?; let mut msgs = vec![]; msgs.push(format!("path: {}", dir.to_string_lossy())); let mut scalar_type = None; @@ -350,6 +384,7 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result Shape::Scalar, EventsItem::ScalarFloat(_) => Shape::Scalar, EventsItem::ScalarDouble(_) => Shape::Scalar, + // TODO use macro: EventsItem::WaveByte(item) => Shape::Wave( item.vals .first() @@ -383,13 +418,13 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result ScalarType::U8, - EventsItem::ScalarShort(_) => ScalarType::I32, + EventsItem::ScalarByte(_) => ScalarType::I8, + EventsItem::ScalarShort(_) => ScalarType::I16, EventsItem::ScalarInt(_) => ScalarType::I32, EventsItem::ScalarFloat(_) => ScalarType::F32, EventsItem::ScalarDouble(_) => ScalarType::F64, - EventsItem::WaveByte(_) => ScalarType::U8, - EventsItem::WaveShort(_) => ScalarType::I32, + EventsItem::WaveByte(_) => ScalarType::I8, + EventsItem::WaveShort(_) => ScalarType::I16, EventsItem::WaveInt(_) => ScalarType::I32, EventsItem::WaveFloat(_) => ScalarType::F32, EventsItem::WaveDouble(_) => ScalarType::F64, @@ -403,8 +438,9 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result Result, Error> { #[derive(Debug)] pub enum EventsItem { - ScalarByte(EventValues), - ScalarShort(EventValues), + ScalarByte(EventValues), + ScalarShort(EventValues), ScalarInt(EventValues), ScalarFloat(EventValues), ScalarDouble(EventValues), - WaveByte(WaveEvents), - WaveShort(WaveEvents), + WaveByte(WaveEvents), + WaveShort(WaveEvents), WaveInt(WaveEvents), WaveFloat(WaveEvents), WaveDouble(WaveEvents), } + +impl EventsItem { + pub fn is_wave(&self) -> bool { + use EventsItem::*; + match self { + WaveByte(_) => true, + WaveShort(_) => true, + WaveInt(_) => true, + WaveFloat(_) => true, + WaveDouble(_) => true, + _ => false, + } + } + + pub fn variant_name(&self) -> String { + use EventsItem::*; + match self { + ScalarByte(item) => format!("ScalarByte"), + ScalarShort(item) => format!("ScalarShort"), + ScalarInt(item) => format!("ScalarInt"), + ScalarFloat(item) => format!("ScalarFloat"), + ScalarDouble(item) => format!("ScalarDouble"), + WaveByte(item) => format!("WaveByte({})", item.len()), + WaveShort(item) => format!("WaveShort({})", item.len()), + WaveInt(item) => format!("WaveInt({})", item.len()), + WaveFloat(item) => format!("WaveFloat({})", item.len()), + WaveDouble(item) => format!("WaveDouble({})", item.len()), + } + } +} + +impl WithLen for EventsItem { + fn len(&self) -> usize { + use EventsItem::*; + match self { + ScalarByte(j) => j.len(), + ScalarShort(j) => j.len(), + ScalarInt(j) => j.len(), + ScalarFloat(j) => j.len(), + ScalarDouble(j) => j.len(), + WaveByte(j) => j.len(), + WaveShort(j) => j.len(), + WaveInt(j) => j.len(), + WaveFloat(j) => j.len(), + WaveDouble(j) => j.len(), + } + } +} + +impl WithTimestamps for EventsItem { + fn ts(&self, ix: usize) -> u64 { + use EventsItem::*; + match self { + ScalarByte(j) => j.ts(ix), + ScalarShort(j) => j.ts(ix), + ScalarInt(j) => j.ts(ix), + ScalarFloat(j) => j.ts(ix), + ScalarDouble(j) => j.ts(ix), + WaveByte(j) => j.ts(ix), + WaveShort(j) => j.ts(ix), + WaveInt(j) => j.ts(ix), + WaveFloat(j) => j.ts(ix), + WaveDouble(j) => j.ts(ix), + } + } +} diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index e0a78a9..12fcc04 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,3 +1,4 @@ +use crate::events::parse_data_filename; use crate::generated::EPICSEvent::PayloadType; use crate::{unescape_archapp_msg, EventsItem}; use archapp_xc::*; @@ -12,7 +13,10 @@ use protobuf::Message; use serde::Serialize; use serde_json::Value as JsonValue; use std::collections::{BTreeMap, VecDeque}; +use std::fs::FileType; +use std::path::PathBuf; use std::sync::Arc; +use std::time::Instant; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -214,6 +218,7 @@ pub struct EpicsEventPayloadInfo { val0: f32, } +// TODO remove in favor of PbFileRead async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Error> { let mut buf = vec![0; 1024 * 4]; { @@ -323,14 +328,41 @@ async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Err Err(Error::with_msg(format!("no data found in file"))) } +struct LruCache { + map: BTreeMap, +} + +impl LruCache { + fn new() -> Self { + Self { map: BTreeMap::new() } + } + + fn insert(&mut self, key: &str) { + self.map.insert(key.into(), Instant::now()); + if self.map.len() > 2000 { + let mut tss: Vec = self.map.values().map(|j| j.clone()).collect(); + tss.sort_unstable(); + let thr = tss[1500]; + let m1 = std::mem::replace(&mut self.map, BTreeMap::new()); + self.map = m1.into_iter().filter(|(j, k)| k > &thr).collect(); + } + } + + fn query(&self, key: &str) -> bool { + self.map.get(key).map_or(false, |_| true) + } +} + pub async fn scan_files_inner( pairs: BTreeMap, node_config: NodeConfigCached, -) -> Result>, Error> { +) -> Result>, Error> { + let _ = pairs; let (tx, rx) = bounded(16); let tx = Arc::new(tx); let tx2 = tx.clone(); let block1 = async move { + let mut lru = LruCache::new(); let aa = if let Some(aa) = &node_config.node.archiver_appliance { aa.clone() } else { @@ -338,24 +370,31 @@ pub async fn scan_files_inner( }; let dbc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let ndi = dbconn::scan::get_node_disk_ident(&node_config, &dbc).await?; + struct PE { + path: PathBuf, + fty: FileType, + } + let proot = aa.data_base_paths.last().unwrap().clone(); + let proots = proot.to_str().unwrap().to_string(); + let meta = tokio::fs::metadata(&proot).await?; let mut paths = VecDeque::new(); - paths.push_back( - aa.data_base_paths.first().unwrap().join( - pairs - .get("subpath") - .ok_or_else(|| Error::with_msg("subpatch not given"))?, - ), - ); + let mut waves_found = 0; + paths.push_back(PE { + path: proot, + fty: meta.file_type(), + }); loop { - if let Some(path) = paths.pop_back() { - let meta = tokio::fs::metadata(&path).await?; - if meta.is_dir() { - let mut rd = tokio::fs::read_dir(&path).await?; + if let Some(pe) = paths.pop_back() { + if pe.fty.is_dir() { + let mut rd = tokio::fs::read_dir(&pe.path).await?; loop { match rd.next_entry().await { Ok(item) => match item { Some(item) => { - paths.push_back(item.path()); + paths.push_back(PE { + path: item.path(), + fty: item.file_type().await?, + }); } None => { break; @@ -366,35 +405,49 @@ pub async fn scan_files_inner( } } } - } else if meta.is_file() { + } else if pe.fty.is_file() { //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; - if path - .to_str() - .ok_or_else(|| Error::with_msg("invalid path string"))? - .ends_with(".pb") - { - let f1 = tokio::fs::File::open(&path).await?; - let (packet, f1) = read_pb_file(f1).await?; - let pvn = packet.pvname.replace("-", "/"); - let pvn = pvn.replace(":", "/"); - let pre = "/arch/lts/ArchiverStore/"; - let p3 = &path.to_str().unwrap()[pre.len()..]; - let p3 = &p3[..p3.len() - 11]; - if p3 != pvn { - tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; - { - let s = format!("{} - {}", p3, packet.pvname); - tx.send(Ok(Box::new(serde_json::to_value(&s)?) as RT1)).await?; + let fns = pe.path.to_str().ok_or_else(|| Error::with_msg("invalid path string"))?; + if let Ok(fnp) = parse_data_filename(&fns) { + tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; + let channel_path = &fns[proots.len() + 1..fns.len() - 11]; + if !lru.query(channel_path) { + let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await; + pbr.read_header().await?; + let normalized_channel_name = { + let pvn = pbr.channel_name().replace("-", "/"); + pvn.replace(":", "/") + }; + if channel_path != normalized_channel_name { + { + let s = format!("{} - {}", channel_path, normalized_channel_name); + tx.send(Ok(Box::new(serde_json::to_value(&s)?) as ItemSerBox)).await?; + } + tx.send(Ok( + Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as ItemSerBox, + )) + .await?; + } else { + if false { + dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; + } + if let Ok(msg) = pbr.read_msg().await { + if msg.is_wave() { + tx.send(Ok(Box::new(serde_json::to_value(format!( + "found {} {}", + msg.variant_name(), + channel_path + ))?) as ItemSerBox)) + .await?; + waves_found += 1; + if waves_found >= 20 { + break; + } + } + } else { + } } - tx.send(Ok( - Box::new(JsonValue::String(format!("MISMATCH --------------------"))) as RT1, - )) - .await?; - } else { - if false { - dbconn::insert_channel(packet.pvname.clone(), ndi.facility, &dbc).await?; - } - tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; + lru.insert(channel_path); } } } diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 0fa2662..7655e0f 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -1,4 +1,4 @@ -use archapp_xc::RT1; +use archapp_xc::ItemSerBox; use async_channel::Receiver; use err::Error; use futures_core::Stream; @@ -12,7 +12,7 @@ use std::pin::Pin; pub fn scan_files( pairs: BTreeMap, node_config: NodeConfigCached, -) -> Pin>, Error>> + Send>> { +) -> Pin>, Error>> + Send>> { Box::pin(archapp::parse::scan_files_inner(pairs, node_config)) } diff --git a/archapp_xc/src/lib.rs b/archapp_xc/src/lib.rs index 173bb57..34d90bc 100644 --- a/archapp_xc/src/lib.rs +++ b/archapp_xc/src/lib.rs @@ -1,7 +1,7 @@ use err::Error; use serde::Serialize; -pub type RT1 = Box; +pub type ItemSerBox = Box; pub trait ItemSer { fn serialize(&self) -> Result, Error>; diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 3f79e40..12cd7a5 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,6 +1,4 @@ use bytes::{BufMut, Bytes, BytesMut}; -use err::Error; -use items::frame::make_frame; use items::{Appendable, RangeOverlapInfo, SitemtyFrameType}; use netpod::log::*; use netpod::NanoRange; diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 9617b78..2288c95 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -220,7 +220,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/archapp/files" { + } else if path == "/api/4/archapp/files/scan" { if req.method() == Method::GET { Ok(archapp_scan_files(req, &node_config).await?) } else { diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index d831f11..2c86582 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -1,4 +1,3 @@ -use crate::frame::{make_frame, make_frame_2}; use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; @@ -6,7 +5,6 @@ use crate::{ ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; -use bytes::BytesMut; use err::Error; use netpod::NanoRange; use serde::{Deserialize, Serialize};