From 15c6e1f6eb29103a288cef3df5baeca914d729fd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 30 Oct 2021 19:30:25 +0200 Subject: [PATCH] Read data blocks --- archapp/src/archeng.rs | 1 + archapp/src/archeng/blockstream.rs | 186 +++++++++++++++++++++++++++++ archapp/src/archeng/datablock.rs | 2 +- archapp/src/archeng/indextree.rs | 12 +- httpret/src/channelarchiver.rs | 53 +++++++- httpret/src/lib.rs | 2 + taskrun/src/lib.rs | 3 +- 7 files changed, 252 insertions(+), 7 deletions(-) create mode 100644 archapp/src/archeng/blockstream.rs diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 760eb96..72c9c45 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,3 +1,4 @@ +pub mod blockstream; pub mod datablock; pub mod datablockstream; pub mod diskio; diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs new file mode 100644 index 0000000..6f75630 --- /dev/null +++ b/archapp/src/archeng/blockstream.rs @@ -0,0 +1,186 @@ +use crate::archeng::datablock::{read_data_1, read_datafile_header}; +use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; +use crate::archeng::indextree::{read_datablockref, IndexFileBasics, RecordIter, RecordTarget}; +use crate::archeng::{open_read, seek, StatsChannel}; +use err::Error; +use futures_core::{Future, Stream}; +#[allow(unused)] +use netpod::log::*; +use netpod::{Channel, ChannelArchiver, FilePos, NanoRange}; +use serde_json::Value as JsVal; +use std::collections::{BTreeMap, VecDeque}; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::pin::Pin; +use tokio::fs::File; + +use super::indextree::HeaderVersion; + +enum Steps { + Start, + SelectIndexFile, + SetupNextPath, + ReadBlocks(RecordIter, Box, PathBuf), + Done, +} + +struct DataBlocks { + conf: ChannelArchiver, + channel: Channel, + range: NanoRange, + steps: Steps, + paths: VecDeque, + file1: Option, + file2: Option, + last_dp: u64, + last_dp2: u64, + last_f2: String, + dfnotfound: BTreeMap, +} + +impl DataBlocks { + fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self { + Self { + conf, + channel, + range, + steps: Steps::Start, + paths: VecDeque::new(), + file1: None, + file2: None, + last_dp: 0, + last_dp2: 0, + last_f2: String::new(), + dfnotfound: BTreeMap::new(), + } + } + + async fn exec(mut self) -> Result, Error> { + use Steps::*; + match self.steps { + Start => { + self.steps = SelectIndexFile; + Ok(Some((JsVal::Null, self))) + } + SelectIndexFile => { + let dbc = database_connect(&self.conf.database).await?; + let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; + let rows = dbc.query(sql, &[&self.channel.name()]).await?; + for row in rows { + self.paths.push_back(row.try_get(0)?); + } + self.steps = SetupNextPath; + Ok(Some((JsVal::String(format!("INIT")), self))) + } + SetupNextPath => { + let stats = &StatsChannel::dummy(); + // For simplicity, simply read all storage classes linearly. + if let Some(path) = self.paths.pop_front() { + // TODO + let basics = IndexFileBasics::from_path(&path, stats).await?; + let mut tree = basics + .rtree_for_channel(self.channel.name(), stats) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?; + if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? { + self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into()); + self.file1 = Some(open_read(path.into(), stats).await?); + } else { + self.steps = SetupNextPath; + }; + } else { + self.steps = Done; + } + Ok(Some((JsVal::String(format!("NEXTPATH")), self))) + } + ReadBlocks(ref mut iter, ref hver, ref indexpath) => { + // TODO stats + let stats = &StatsChannel::dummy(); + // TODO I need to keep some datafile open. + let item = if let Some(rec) = iter.next().await? { + // TODO the iterator should actually return Dataref. We never expect child nodes here. + if let RecordTarget::Dataref(dp) = rec.target { + let f1 = self.file1.as_mut().unwrap(); + //seek(f1, SeekFrom::Start(dp.0), stats).await?; + // Read the dataheader... + // TODO the function should take a DatarefPos or? + // TODO the seek is hidden in the function which makes possible optimization not accessible. + let dref = read_datablockref(f1, FilePos { pos: dp.0 }, hver.as_ref(), stats).await?; + // TODO Remember the index path, need it here for relative path. + // TODO open datafile, relative path to index path. + // TODO keep open when path does not change. + let acc; + let num_samples; + if let Some(_) = self.dfnotfound.get(dref.file_name()) { + num_samples = 0; + acc = 1; + } else { + if dref.file_name() == self.last_f2 { + acc = 2; + } else { + let dpath = indexpath.parent().unwrap().join(dref.file_name()); + match open_read(dpath, stats).await { + Ok(f2) => { + acc = 4; + self.file2 = Some(f2); + self.last_f2 = dref.file_name().into(); + } + Err(_) => { + acc = 3; + self.file2 = None; + } + } + }; + if let Some(f2) = self.file2.as_mut() { + let dfheader = read_datafile_header(f2, dref.data_header_pos(), stats).await?; + num_samples = dfheader.num_samples; + } else { + self.dfnotfound.insert(dref.file_name().into(), true); + num_samples = 0; + }; + } + let item = serde_json::to_value(( + dp.0, + dp.0 as i64 - self.last_dp as i64, + dref.file_name(), + dref.data_header_pos.0, + dref.data_header_pos.0 as i64 - self.last_dp2 as i64, + dref.next().0, + acc, + num_samples, + ))?; + self.last_dp = dp.0; + self.last_dp2 = dref.data_header_pos.0; + item + } else { + panic!(); + } + } else { + self.steps = SetupNextPath; + JsVal::String(format!("NOMORE")) + }; + Ok(Some((item, self))) + } + Done => Ok(None), + } + } +} + +impl UnfoldExec for DataBlocks { + type Output = JsVal; + + fn exec(self) -> Pin, Error>> + Send>> + where + Self: Sized, + { + Box::pin(self.exec()) + } +} + +pub fn blockstream( + channel: Channel, + range: NanoRange, + conf: ChannelArchiver, +) -> impl Stream> { + unfold_stream(DataBlocks::new(channel, range, conf.clone())) +} diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index d58808a..6a4b317 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -60,7 +60,7 @@ pub struct DatafileHeader { next_offset: u32, prev_offset: u32, curr_offset: u32, - num_samples: u32, + pub num_samples: u32, ctrl_info_offset: u32, buf_size: u32, buf_free: u32, diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index 88efb51..b9868c3 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -105,6 +105,10 @@ impl IndexFileBasics { let file = open_read(path.clone(), stats).await?; read_file_basics(path, file, stats).await } + + pub fn hver(&self) -> &Box { + &self.hver + } } pub async fn read_file_basics(path: PathBuf, file: File, stats: &StatsChannel) -> Result { @@ -634,7 +638,7 @@ impl RecordIter { self.stack.push_back(nr3); } RecordTarget::Dataref(_) => { - info!("loop B is-leaf"); + trace!("loop B is-leaf"); // done, we've positioned the next result. break; } @@ -901,9 +905,9 @@ fn parse_name_hash_channel_entry(buf: &[u8], hver: &dyn HeaderVersion) -> Result #[derive(Debug)] pub struct Dataref { - next: DatarefPos, - data_header_pos: DataheaderPos, - fname: String, + pub next: DatarefPos, + pub data_header_pos: DataheaderPos, + pub fname: String, } impl Dataref { diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 08f5ed6..ba43d24 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -4,7 +4,7 @@ use futures_core::Stream; use futures_util::StreamExt; use http::{header, Method, Request, Response, StatusCode}; use hyper::Body; -use netpod::log::*; +use netpod::{log::*, Channel, NanoRange}; use netpod::{NodeConfigCached, APP_JSON_LINES}; use serde::Serialize; @@ -155,6 +155,57 @@ impl ScanChannels { } } +pub struct BlockStream {} + +impl BlockStream { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/blockstream" + } + + pub fn name() -> &'static str { + "BlockStream" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + let range = NanoRange { beg: 0, end: u64::MAX }; + let channel = Channel { + backend: "".into(), + name: "ARIDI-PCT:CURRENT".into(), + }; + let s = archapp_wrap::archapp::archeng::blockstream::blockstream(channel, range, conf.clone()); + let s = json_lines_stream(s); + let s = s.map(|item| match item { + Ok(k) => Ok(k), + Err(e) => { + error!("observe error: {}", e); + Err(e) + } + }); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(s))?) + } +} + pub struct ListChannelsHttpFunction {} impl ListChannelsHttpFunction { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 06c5fbe..455e1be 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -290,6 +290,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) { h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::BlockStream::should_handle(path) { + h.handle(req, &node_config).await } else if path.starts_with("/api/1/requestStatus/") { info!("{}", path); Ok(response(StatusCode::OK).body(Body::from("{}"))?) diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 4e22632..0c4e7ea 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -94,7 +94,8 @@ pub fn tracing_init() { "info", "archapp::archeng=info", "archapp::archeng::datablockstream=info", - "archapp::archeng::indextree=trace", + "archapp::archeng::indextree=info", + "archapp::archeng::blockstream=trace", "archapp::storagemerge=info", "daqbuffer::test=trace", ]