From 4d3965660c4fe548071d8b1668f8a33f6cfed44c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 2 Nov 2021 09:08:58 +0100 Subject: [PATCH] Test on the VM with NFS confirms again low iops --- archapp/src/archeng.rs | 1 + archapp/src/archeng/backreadbuf.rs | 28 +- archapp/src/archeng/blockrefstream.rs | 242 ++++++++++++ archapp/src/archeng/blockstream.rs | 537 ++++++++++++++++---------- archapp/src/archeng/datablock.rs | 2 +- archapp/src/archeng/ringbuf.rs | 28 +- httpret/src/channelarchiver.rs | 80 +++- httpret/src/lib.rs | 2 + 8 files changed, 707 insertions(+), 213 deletions(-) create mode 100644 archapp/src/archeng/blockrefstream.rs diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 7c526fa..c02a708 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,4 +1,5 @@ pub mod backreadbuf; +pub mod blockrefstream; pub mod blockstream; pub mod bufminread; pub mod datablock; diff --git a/archapp/src/archeng/backreadbuf.rs b/archapp/src/archeng/backreadbuf.rs index 4c97709..27292ae 100644 --- a/archapp/src/archeng/backreadbuf.rs +++ b/archapp/src/archeng/backreadbuf.rs @@ -1,7 +1,9 @@ use crate::archeng::{read, seek, StatsChannel}; use err::Error; use netpod::log::*; -use std::{borrow::BorrowMut, io::SeekFrom}; +use std::borrow::BorrowMut; +use std::fmt; +use std::io::SeekFrom; use tokio::fs::File; pub struct BackReadBuf { @@ -106,13 +108,21 @@ where } } -impl Drop for BackReadBuf { - fn drop(&mut self) { - info!( - "BackReadBuf Drop {} {}% {}", - self.seek_request, - self.seek_done * 100 / self.seek_request, - self.read_done - ); +impl fmt::Debug for BackReadBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BackReadBuf") + .field("abs", &self.abs) + .field("wp", &self.wp) + .field("rp", &self.rp) + .field("seek_request", &self.seek_request) + .field("seek_done", &self.seek_done) + .field("read_done", &self.read_done) + .finish() + } +} + +impl Drop for BackReadBuf { + fn drop(&mut self) { + info!("Drop {:?}", self); } } diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs new file mode 100644 index 0000000..99cbaa7 --- /dev/null +++ b/archapp/src/archeng/blockrefstream.rs @@ -0,0 +1,242 @@ +use crate::archeng::backreadbuf::BackReadBuf; +use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2}; +use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; +use crate::archeng::indextree::{ + read_datablockref, read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, + RecordTarget, +}; +use crate::archeng::ringbuf::RingBuf; +use crate::archeng::{open_read, seek, StatsChannel}; +use err::Error; +use futures_core::{Future, Stream}; +use items::WithLen; +#[allow(unused)] +use netpod::log::*; +use netpod::{Channel, ChannelArchiver, FilePos, NanoRange}; +use serde::Serialize; +use serde_json::Value as JsVal; +use std::collections::{BTreeMap, VecDeque}; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::pin::Pin; +use tokio::fs::File; + +#[derive(Debug)] +pub struct Blockref { + pub dref: Dataref, + pub dpath: PathBuf, +} + +#[derive(Debug)] +pub enum BlockrefItem { + Blockref(Blockref, JsVal), + JsVal(JsVal), +} + +enum Steps { + Start, + SelectIndexFile, + SetupNextPath, + ReadBlocks(RecordIter, Box, PathBuf), + Done, +} + +struct BlockrefStream { + conf: ChannelArchiver, + channel: Channel, + range: NanoRange, + steps: Steps, + paths: VecDeque, + file1: Option>, + file2: Option>, + last_dp: u64, + last_dp2: u64, + last_f2: String, + last_dfhpos: DataheaderPos, + dfnotfound: BTreeMap, + data_bytes_read: u64, + same_dfh_count: u64, +} + +impl BlockrefStream { + fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self { + Self { + conf, + channel, + range, + steps: Steps::Start, + paths: VecDeque::new(), + file1: None, + file2: None, + last_dp: 0, + last_dp2: 0, + last_f2: String::new(), + last_dfhpos: DataheaderPos(u64::MAX), + dfnotfound: BTreeMap::new(), + data_bytes_read: 0, + same_dfh_count: 0, + } + } + + async fn exec(mut self) -> Result, Error> { + use Steps::*; + match self.steps { + Start => { + self.steps = SelectIndexFile; + Ok(Some((BlockrefItem::JsVal(JsVal::Null), self))) + } + SelectIndexFile => { + let dbc = database_connect(&self.conf.database).await?; + let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; + let rows = dbc.query(sql, &[&self.channel.name()]).await?; + for row in rows { + self.paths.push_back(row.try_get(0)?); + } + self.steps = SetupNextPath; + Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("INIT"))), self))) + } + SetupNextPath => { + let stats = &StatsChannel::dummy(); + // For simplicity, simply read all storage classes linearly. + if let Some(path) = self.paths.pop_front() { + // TODO + let mut file = open_read(path.clone().into(), stats).await?; + let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?; + let mut tree = basics + .rtree_for_channel(self.channel.name(), stats) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?; + if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? { + debug!("SetupNextPath {:?}", path); + self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into()); + self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?); + } else { + self.steps = SetupNextPath; + }; + Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NEXTPATH"))), self))) + } else { + self.steps = Done; + Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DONE"))), self))) + } + } + ReadBlocks(ref mut iter, ref hver, ref indexpath) => { + // TODO stats + let stats = &StatsChannel::dummy(); + // TODO I need to keep some datafile open. + let item = if let Some(rec) = iter.next().await? { + // TODO the iterator should actually return Dataref. We never expect child nodes here. + if let RecordTarget::Dataref(dp) = rec.target { + let f1 = self.file1.as_mut().unwrap(); + let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?; + let dpath = indexpath.parent().unwrap().join(dref.file_name()); + // TODO Remember the index path, need it here for relative path. + // TODO open datafile, relative path to index path. + // TODO keep open when path does not change. + let acc; + let num_samples; + if false { + if let Some(_) = self.dfnotfound.get(dref.file_name()) { + num_samples = 0; + acc = 1; + } else { + if dref.file_name() == self.last_f2 { + acc = 2; + } else { + match open_read(dpath.clone(), stats).await { + Ok(f2) => { + acc = 4; + self.file2 = Some( + RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy()) + .await?, + ); + self.last_f2 = dref.file_name().into(); + } + Err(_) => { + acc = 3; + self.file2 = None; + } + } + }; + if let Some(f2) = self.file2.as_mut() { + if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos { + num_samples = 0; + } else { + self.last_dfhpos = dref.data_header_pos(); + let rp1 = f2.rp_abs(); + let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?; + let data = read_data2(f2, &dfheader, self.range.clone(), false).await?; + let rp2 = f2.rp_abs(); + self.data_bytes_read += rp2 - rp1; + num_samples = dfheader.num_samples; + if data.len() != num_samples as usize { + if (data.len() as i64 - num_samples as i64).abs() < 4 { + // TODO get always one event less than num_samples tells us. + //warn!("small deviation {} vs {}", data.len(), num_samples); + } else { + return Err(Error::with_msg_no_trace(format!( + "event count mismatch {} vs {}", + data.len(), + num_samples + ))); + } + } + } + } else { + self.dfnotfound.insert(dref.file_name().into(), true); + num_samples = 0; + }; + } + } else { + acc = 6; + num_samples = 0; + } + let jsval = serde_json::to_value(( + dp.0, + dp.0 as i64 - self.last_dp as i64, + dref.file_name(), + dref.data_header_pos.0, + dref.data_header_pos.0 as i64 - self.last_dp2 as i64, + dref.next().0, + acc, + num_samples, + ))?; + self.last_dp = dp.0; + self.last_dp2 = dref.data_header_pos.0; + let bref = Blockref { dref, dpath }; + BlockrefItem::Blockref(bref, jsval) + } else { + panic!(); + } + } else { + info!( + "data_bytes_read: {} same_dfh_count: {}", + self.data_bytes_read, self.same_dfh_count + ); + self.steps = SetupNextPath; + BlockrefItem::JsVal(JsVal::String(format!("NOMORE"))) + }; + Ok(Some((item, self))) + } + Done => Ok(None), + } + } +} + +impl UnfoldExec for BlockrefStream { + type Output = BlockrefItem; + + fn exec(self) -> Pin, Error>> + Send>> + where + Self: Sized, + { + Box::pin(self.exec()) + } +} + +pub fn blockref_stream( + channel: Channel, + range: NanoRange, + conf: ChannelArchiver, +) -> impl Stream> { + unfold_stream(BlockrefStream::new(channel, range, conf.clone())) +} diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index 8daebc2..08b59be 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -1,226 +1,369 @@ -use crate::archeng::backreadbuf::BackReadBuf; -use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2}; -use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; -use crate::archeng::indextree::{ - read_datablockref, read_datablockref2, DataheaderPos, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, -}; -use crate::archeng::ringbuf::RingBuf; -use crate::archeng::{open_read, seek, StatsChannel}; +use super::indextree::DataheaderPos; +use super::ringbuf::RingBuf; +use super::{open_read, StatsChannel}; +use crate::archeng::blockrefstream::BlockrefItem; +use crate::archeng::datablock::{read_data2, read_datafile_header2}; +use crate::eventsitem::EventsItem; use err::Error; use futures_core::{Future, Stream}; -use items::WithLen; -#[allow(unused)] -use netpod::log::*; -use netpod::{Channel, ChannelArchiver, FilePos, NanoRange}; +use futures_util::stream::FuturesOrdered; +use futures_util::StreamExt; +use items::{WithLen, WithTimestamps}; +use netpod::{log::*, NanoRange}; +use serde::Serialize; use serde_json::Value as JsVal; use std::collections::{BTreeMap, VecDeque}; -use std::io::SeekFrom; +use std::fmt; use std::path::PathBuf; use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use tokio::fs::File; -enum Steps { - Start, - SelectIndexFile, - SetupNextPath, - ReadBlocks(RecordIter, Box, PathBuf), +#[derive(Debug, Serialize)] +pub struct StatsAcc { + items: u64, + events: u64, + bytes: u64, + #[serde(skip)] + beg: Instant, +} + +impl StatsAcc { + pub fn new() -> Self { + Self { + items: 0, + events: 0, + bytes: 0, + beg: Instant::now(), + } + } + + fn add(&mut self, events: u64, bytes: u64) { + self.items += 1; + self.events += events; + self.bytes += bytes; + } + + fn older(&self, dur: Duration) -> bool { + Instant::now().duration_since(self.beg) >= dur + } +} + +struct Reader { + fname: String, + rb: RingBuf, +} + +impl Reader {} + +struct FutAItem { + fname: String, + path: PathBuf, + dfnotfound: bool, + reader: Option, + bytes_read: u64, + events_read: u64, + events: Option, +} + +pub struct FutA { + fname: String, + pos: DataheaderPos, + reader: Option, +} + +impl Future for FutA { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + err::todoval() + } +} + +pub enum BlockItem {} + +pub struct BlockStream { + inp: S, + inp_done: bool, + range: NanoRange, + dfnotfound: BTreeMap, + block_reads: FuturesOrdered> + Send>>>, + max_reads: usize, + readers: VecDeque, + last_dfname: String, + last_dfhpos: DataheaderPos, + ts_max: u64, + done: bool, + complete: bool, + acc: StatsAcc, + good_reader: u64, + discard_reader: u64, + not_found_hit: u64, + same_block: u64, +} + +impl BlockStream { + pub fn new(inp: S, range: NanoRange, max_reads: usize) -> Self + where + S: Stream> + Unpin, + { + Self { + inp, + inp_done: false, + range, + dfnotfound: BTreeMap::new(), + block_reads: FuturesOrdered::new(), + max_reads, + readers: VecDeque::new(), + last_dfname: String::new(), + last_dfhpos: DataheaderPos(u64::MAX), + ts_max: 0, + done: false, + complete: false, + acc: StatsAcc::new(), + good_reader: 0, + discard_reader: 0, + not_found_hit: 0, + same_block: 0, + } + } +} + +enum Int { + NoWork, + Pending, + Empty, + Item(T), Done, } -struct DataBlocks { - conf: ChannelArchiver, - channel: Channel, - range: NanoRange, - steps: Steps, - paths: VecDeque, - file1: Option>, - file2: Option>, - last_dp: u64, - last_dp2: u64, - last_f2: String, - last_dfhpos: DataheaderPos, - dfnotfound: BTreeMap, - data_bytes_read: u64, - same_dfh_count: u64, -} +impl Stream for BlockStream +where + S: Stream> + Unpin, +{ + type Item = Result; -impl DataBlocks { - fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self { - Self { - conf, - channel, - range, - steps: Steps::Start, - paths: VecDeque::new(), - file1: None, - file2: None, - last_dp: 0, - last_dp2: 0, - last_f2: String::new(), - last_dfhpos: DataheaderPos(u64::MAX), - dfnotfound: BTreeMap::new(), - data_bytes_read: 0, - same_dfh_count: 0, - } - } - - async fn exec(mut self) -> Result, Error> { - use Steps::*; - match self.steps { - Start => { - self.steps = SelectIndexFile; - Ok(Some((JsVal::Null, self))) - } - SelectIndexFile => { - let dbc = database_connect(&self.conf.database).await?; - let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; - let rows = dbc.query(sql, &[&self.channel.name()]).await?; - for row in rows { - self.paths.push_back(row.try_get(0)?); - } - self.steps = SetupNextPath; - Ok(Some((JsVal::String(format!("INIT")), self))) - } - SetupNextPath => { - let stats = &StatsChannel::dummy(); - // For simplicity, simply read all storage classes linearly. - if let Some(path) = self.paths.pop_front() { - // TODO - let mut file = open_read(path.clone().into(), stats).await?; - let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?; - let mut tree = basics - .rtree_for_channel(self.channel.name(), stats) - .await? - .ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?; - if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? { - debug!("SetupNextPath {:?}", path); - self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into()); - self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?); - } else { - self.steps = SetupNextPath; - }; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else { + let item1 = if self.inp_done { + Int::Done + } else if self.block_reads.len() >= self.max_reads { + Int::NoWork } else { - self.steps = Done; - } - Ok(Some((JsVal::String(format!("NEXTPATH")), self))) - } - ReadBlocks(ref mut iter, ref hver, ref indexpath) => { - // TODO stats - let stats = &StatsChannel::dummy(); - // TODO I need to keep some datafile open. - let item = if let Some(rec) = iter.next().await? { - // TODO the iterator should actually return Dataref. We never expect child nodes here. - if let RecordTarget::Dataref(dp) = rec.target { - let f1 = self.file1.as_mut().unwrap(); - let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?; - // TODO Remember the index path, need it here for relative path. - // TODO open datafile, relative path to index path. - // TODO keep open when path does not change. - let acc; - let num_samples; - if true { - if let Some(_) = self.dfnotfound.get(dref.file_name()) { - num_samples = 0; - acc = 1; - } else { - if dref.file_name() == self.last_f2 { - acc = 2; - } else { - let dpath = indexpath.parent().unwrap().join(dref.file_name()); - match open_read(dpath, stats).await { - Ok(f2) => { - acc = 4; - self.file2 = Some( - RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy()) - .await?, - ); - self.last_f2 = dref.file_name().into(); - } - Err(_) => { - acc = 3; - self.file2 = None; - } - } - }; - if let Some(f2) = self.file2.as_mut() { - if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos { - num_samples = 0; - } else { - self.last_dfhpos = dref.data_header_pos(); - let rp1 = f2.rp_abs(); - let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?; - let data = read_data2(f2, &dfheader, self.range.clone(), false).await?; - let rp2 = f2.rp_abs(); - self.data_bytes_read += rp2 - rp1; - num_samples = dfheader.num_samples; - if data.len() != num_samples as usize { - if (data.len() as i64 - num_samples as i64).abs() < 4 { - // TODO get always one event less than num_samples tells us. - //warn!("small deviation {} vs {}", data.len(), num_samples); + match self.inp.poll_next_unpin(cx) { + Ready(item) => match item { + Some(item) => match item { + Ok(item) => match item { + BlockrefItem::Blockref(bref, _jsval) => { + if let Some(_) = self.dfnotfound.get(&bref.dpath) { + self.not_found_hit += 1; + } else { + if bref.dref.file_name() == self.last_dfname + && bref.dref.data_header_pos() == self.last_dfhpos + { + self.same_block += 1; } else { - return Err(Error::with_msg_no_trace(format!( - "event count mismatch {} vs {}", - data.len(), - num_samples - ))); - } + let reader = if let Some(reader) = self.readers.pop_front() { + if reader.fname == bref.dref.file_name() { + self.good_reader += 1; + Some(reader) + } else { + self.discard_reader += 1; + None + } + } else { + None + }; + let fname = bref.dref.file_name().to_string(); + let dpath = bref.dpath; + let pos = bref.dref.data_header_pos(); + let fut = { + let fname = fname.clone(); + let pos = pos.clone(); + let range = self.range.clone(); + async move { + let reader = if let Some(reader) = reader { + Some(reader) + } else { + let stats = StatsChannel::dummy(); + info!("open new reader file {:?}", dpath); + match open_read(dpath.clone(), &stats).await { + Ok(file) => { + // + let reader = Reader { + fname: fname.clone(), + rb: RingBuf::new(file, pos.0, stats).await?, + }; + Some(reader) + } + Err(_) => None, + } + }; + if let Some(mut reader) = reader { + let rp1 = reader.rb.bytes_read(); + let dfheader = + read_datafile_header2(&mut reader.rb, pos).await?; + let data = + read_data2(&mut reader.rb, &dfheader, range, false) + .await?; + let rp2 = reader.rb.bytes_read(); + let bytes_read = rp2 - rp1; + let ret = FutAItem { + fname, + path: dpath, + dfnotfound: false, + reader: Some(reader), + bytes_read, + events_read: data.len() as u64, + events: Some(data), + }; + Ok(ret) + } else { + let ret = FutAItem { + fname, + path: dpath, + dfnotfound: true, + reader: None, + bytes_read: 0, + events_read: 0, + events: None, + }; + Ok(ret) + } + } + }; + self.block_reads.push(Box::pin(fut)); + self.last_dfname = fname; + self.last_dfhpos = pos; + }; + } + Int::Empty + } + BlockrefItem::JsVal(_jsval) => Int::Empty, + }, + Err(e) => { + self.done = true; + Int::Item(Err(e)) + } + }, + None => { + self.inp_done = true; + Int::Done + } + }, + Pending => Int::Pending, + } + }; + let item2 = if let Int::Item(_) = item1 { + Int::NoWork + } else { + if self.block_reads.len() == 0 { + Int::NoWork + } else { + match self.block_reads.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => { + // + if item.dfnotfound { + self.dfnotfound.insert(item.path, true); + } + if let Some(reader) = item.reader { + self.readers.push_back(reader); + } + if let Some(ev) = &item.events { + for i in 0..ev.len() { + let ts = ev.ts(i); + if ts < self.ts_max { + let msg = format!("unordered event: {} {}", ts, self.ts_max); + error!("{}", msg); + self.done = true; + return Ready(Some(Err(Error::with_msg_no_trace(msg)))); + } + } + } + self.acc.add(item.events_read, item.bytes_read); + if false { + let item = JsVal::String(format!( + "bytes read {} {} events {}", + item.bytes_read, + item.events.is_some(), + item.events_read + )); + } + if self.acc.older(Duration::from_millis(1000)) { + let ret = std::mem::replace(&mut self.acc, StatsAcc::new()); + match serde_json::to_value((ret, self.block_reads.len(), self.readers.len())) { + Ok(item) => Int::Item(Ok(item)), + Err(e) => { + self.done = true; + return Ready(Some(Err(e.into()))); } } } else { - self.dfnotfound.insert(dref.file_name().into(), true); - num_samples = 0; - }; + //Int::Item(Ok(item)) + Int::Empty + } } - } else { - acc = 6; - num_samples = 0; + Ready(Some(Err(e))) => { + self.done = true; + Int::Item(Err(e)) + } + Ready(None) => { + panic!(); + } + Pending => Int::Pending, } - let item = serde_json::to_value(( - dp.0, - dp.0 as i64 - self.last_dp as i64, - dref.file_name(), - dref.data_header_pos.0, - dref.data_header_pos.0 as i64 - self.last_dp2 as i64, - dref.next().0, - acc, - num_samples, - ))?; - self.last_dp = dp.0; - self.last_dp2 = dref.data_header_pos.0; - item - } else { - panic!(); } - } else { - info!( - "data_bytes_read: {} same_dfh_count: {}", - self.data_bytes_read, self.same_dfh_count - ); - self.steps = SetupNextPath; - JsVal::String(format!("NOMORE")) }; - Ok(Some((item, self))) - } - Done => Ok(None), + match (item1, item2) { + (Int::Item(_), Int::Item(_)) => panic!(), + (Int::NoWork, Int::NoWork) => panic!(), + (_, Int::Done) => panic!(), + (Int::Item(item), _) => Ready(Some(item)), + (_, Int::Item(item)) => Ready(Some(item)), + (Int::Pending | Int::NoWork, Int::Pending) => Pending, + (Int::Pending, Int::NoWork) => Pending, + (Int::Done, Int::Pending) => Pending, + (Int::Pending | Int::Done | Int::Empty | Int::NoWork, Int::Empty) => continue, + (Int::Empty, Int::Pending | Int::NoWork) => continue, + (Int::Done, Int::NoWork) => { + self.done = true; + Ready(None) + } + } + }; } } } -impl UnfoldExec for DataBlocks { - type Output = JsVal; - - fn exec(self) -> Pin, Error>> + Send>> - where - Self: Sized, - { - Box::pin(self.exec()) +impl fmt::Debug for BlockStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockStream") + .field("inp_done", &self.inp_done) + .field("range", &self.range) + .field("max_reads", &self.max_reads) + .field("ts_max", &self.ts_max) + .field("done", &self.done) + .field("complete", &self.complete) + .field("acc", &self.acc) + .field("good_reader", &self.good_reader) + .field("discard_reader", &self.discard_reader) + .field("not_found_hit", &self.not_found_hit) + .field("same_block", &self.same_block) + .finish() } } -pub fn blockstream( - channel: Channel, - range: NanoRange, - conf: ChannelArchiver, -) -> impl Stream> { - unfold_stream(DataBlocks::new(channel, range, conf.clone())) +impl Drop for BlockStream { + fn drop(&mut self) { + info!("Drop {:?}", self); + } } diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index 4804bb7..6b68699 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -251,7 +251,7 @@ pub async fn read_data2( } } rb.adv(n3); - info!("parsed block with {} / {} events", ntot, evs.tss.len()); + //info!("parsed block with {} / {} events", ntot, evs.tss.len()); let evs = ScalarPlainEvents::Double(evs); let plain = PlainEvents::Scalar(evs); let item = EventsItem::Plain(plain); diff --git a/archapp/src/archeng/ringbuf.rs b/archapp/src/archeng/ringbuf.rs index 4cd9002..227ac51 100644 --- a/archapp/src/archeng/ringbuf.rs +++ b/archapp/src/archeng/ringbuf.rs @@ -16,6 +16,9 @@ pub struct RingBuf { seek_request: u64, seek_done: u64, read_done: u64, + small_pos: u64, + small_neg: u64, + bytes_read: u64, } impl RingBuf @@ -33,6 +36,9 @@ where seek_request: 0, seek_done: 0, read_done: 0, + small_pos: 0, + small_neg: 0, + bytes_read: 0, }; ret.seek(pos).await?; Ok(ret) @@ -67,14 +73,16 @@ where self.rp = 0; } } + let max = (self.buf.len() - self.wp).min(1024 * 8) + self.wp; let n = read( self.file.as_mut().unwrap().borrow_mut(), - &mut self.buf[self.wp..], + &mut self.buf[self.wp..max], &self.stats, ) .await?; self.wp += n; self.read_done += 1; + self.bytes_read += n as u64; Ok(n) } @@ -90,11 +98,13 @@ where } pub async fn seek(&mut self, pos: u64) -> Result { - let dp = pos as i64 - self.abs as i64 - self.rp as i64; + let dp = pos as i64 - self.rp_abs() as i64; if dp < 0 && dp > -2048 { debug!("small NEG seek {}", dp); } else if dp == 0 { - debug!("zero seek"); + // TODO check callsites, some cases could be eliminated. + //debug!("zero seek"); + return Ok(pos); } else if dp > 0 && dp < 2048 { debug!("small POS seek {}", dp); } @@ -116,6 +126,10 @@ where pub fn rp_abs(&self) -> u64 { self.abs as u64 + self.rp as u64 } + + pub fn bytes_read(&self) -> u64 { + self.bytes_read + } } impl fmt::Debug for RingBuf { @@ -124,12 +138,18 @@ impl fmt::Debug for RingBuf { .field("abs", &self.abs) .field("wp", &self.wp) .field("rp", &self.rp) + .field("seek_request", &self.seek_request) + .field("seek_done", &self.seek_done) + .field("read_done", &self.read_done) + .field("small_pos", &self.small_pos) + .field("small_neg", &self.small_neg) + .field("bytes_read", &self.bytes_read) .finish() } } impl Drop for RingBuf { fn drop(&mut self) { - info!("RingBuf Drop {} {}", self.seek_request, self.read_done); + info!("Drop {:?}", self); } } diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index ba43d24..4663f3f 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -1,12 +1,15 @@ use crate::response; +use disk::events::PlainEventsJsonQuery; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use http::{header, Method, Request, Response, StatusCode}; use hyper::Body; -use netpod::{log::*, Channel, NanoRange}; +use netpod::query::RawEventsQuery; +use netpod::{get_url_query_pairs, log::*, Channel, NanoRange}; use netpod::{NodeConfigCached, APP_JSON_LINES}; use serde::Serialize; +use url::Url; fn json_lines_stream(stream: S) -> impl Stream, Error>> where @@ -155,6 +158,67 @@ impl ScanChannels { } } +pub struct BlockRefStream {} + +impl BlockRefStream { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/blockrefstream" + } + + pub fn name() -> &'static str { + "BlockRefStream" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + let range = NanoRange { beg: 0, end: u64::MAX }; + let channel = Channel { + backend: "".into(), + name: "ARIDI-PCT:CURRENT".into(), + }; + let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range, conf.clone()); + let s = s.map(|item| match item { + Ok(item) => { + use archapp_wrap::archapp::archeng::blockrefstream::BlockrefItem::*; + match item { + Blockref(_k, jsval) => Ok(jsval), + JsVal(jsval) => Ok(jsval), + } + } + Err(e) => Err(e), + }); + let s = json_lines_stream(s); + let s = s.map(|item| match item { + Ok(k) => Ok(k), + Err(e) => { + error!("observe error: {}", e); + Err(e) + } + }); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(s))?) + } +} + pub struct BlockStream {} impl BlockStream { @@ -191,7 +255,19 @@ impl BlockStream { backend: "".into(), name: "ARIDI-PCT:CURRENT".into(), }; - let s = archapp_wrap::archapp::archeng::blockstream::blockstream(channel, range, conf.clone()); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?; + let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone()); + let s = Box::pin(s); + let s = archapp_wrap::archapp::archeng::blockstream::BlockStream::new(s, range.clone(), read_queue); + let s = s.map(|item| match item { + Ok(item) => { + //use archapp_wrap::archapp::archeng::blockstream::BlockItem::*; + Ok(item) + } + Err(e) => Err(e), + }); let s = json_lines_stream(s); let s = s.map(|item| match item { Ok(k) => Ok(k), diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 455e1be..f50d5c3 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -290,6 +290,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) { h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::BlockRefStream::should_handle(path) { + h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::BlockStream::should_handle(path) { h.handle(req, &node_config).await } else if path.starts_with("/api/1/requestStatus/") {