Test on the VM with NFS confirms again low iops

This commit is contained in:
Dominik Werder
2021-11-02 09:08:58 +01:00
parent f48e4b8ea4
commit 4d3965660c
8 changed files with 707 additions and 213 deletions

View File

@@ -1,4 +1,5 @@
pub mod backreadbuf;
pub mod blockrefstream;
pub mod blockstream;
pub mod bufminread;
pub mod datablock;

View File

@@ -1,7 +1,9 @@
use crate::archeng::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::{borrow::BorrowMut, io::SeekFrom};
use std::borrow::BorrowMut;
use std::fmt;
use std::io::SeekFrom;
use tokio::fs::File;
pub struct BackReadBuf<F> {
@@ -106,13 +108,21 @@ where
}
}
impl<F> Drop for BackReadBuf<F> {
fn drop(&mut self) {
info!(
"BackReadBuf Drop {} {}% {}",
self.seek_request,
self.seek_done * 100 / self.seek_request,
self.read_done
);
impl<F> fmt::Debug for BackReadBuf<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BackReadBuf")
.field("abs", &self.abs)
.field("wp", &self.wp)
.field("rp", &self.rp)
.field("seek_request", &self.seek_request)
.field("seek_done", &self.seek_done)
.field("read_done", &self.read_done)
.finish()
}
}
impl<F> Drop for BackReadBuf<F> {
fn drop(&mut self) {
info!("Drop {:?}", self);
}
}

View File

@@ -0,0 +1,242 @@
use crate::archeng::backreadbuf::BackReadBuf;
use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2};
use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec};
use crate::archeng::indextree::{
read_datablockref, read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter,
RecordTarget,
};
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{open_read, seek, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use items::WithLen;
#[allow(unused)]
use netpod::log::*;
use netpod::{Channel, ChannelArchiver, FilePos, NanoRange};
use serde::Serialize;
use serde_json::Value as JsVal;
use std::collections::{BTreeMap, VecDeque};
use std::io::SeekFrom;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::fs::File;
#[derive(Debug)]
pub struct Blockref {
pub dref: Dataref,
pub dpath: PathBuf,
}
#[derive(Debug)]
pub enum BlockrefItem {
Blockref(Blockref, JsVal),
JsVal(JsVal),
}
enum Steps {
Start,
SelectIndexFile,
SetupNextPath,
ReadBlocks(RecordIter, Box<dyn HeaderVersion>, PathBuf),
Done,
}
struct BlockrefStream {
conf: ChannelArchiver,
channel: Channel,
range: NanoRange,
steps: Steps,
paths: VecDeque<String>,
file1: Option<BackReadBuf<File>>,
file2: Option<RingBuf<File>>,
last_dp: u64,
last_dp2: u64,
last_f2: String,
last_dfhpos: DataheaderPos,
dfnotfound: BTreeMap<String, bool>,
data_bytes_read: u64,
same_dfh_count: u64,
}
impl BlockrefStream {
fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self {
Self {
conf,
channel,
range,
steps: Steps::Start,
paths: VecDeque::new(),
file1: None,
file2: None,
last_dp: 0,
last_dp2: 0,
last_f2: String::new(),
last_dfhpos: DataheaderPos(u64::MAX),
dfnotfound: BTreeMap::new(),
data_bytes_read: 0,
same_dfh_count: 0,
}
}
async fn exec(mut self) -> Result<Option<(BlockrefItem, Self)>, Error> {
use Steps::*;
match self.steps {
Start => {
self.steps = SelectIndexFile;
Ok(Some((BlockrefItem::JsVal(JsVal::Null), self)))
}
SelectIndexFile => {
let dbc = database_connect(&self.conf.database).await?;
let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index";
let rows = dbc.query(sql, &[&self.channel.name()]).await?;
for row in rows {
self.paths.push_back(row.try_get(0)?);
}
self.steps = SetupNextPath;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("INIT"))), self)))
}
SetupNextPath => {
let stats = &StatsChannel::dummy();
// For simplicity, simply read all storage classes linearly.
if let Some(path) = self.paths.pop_front() {
// TODO
let mut file = open_read(path.clone().into(), stats).await?;
let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(self.channel.name(), stats)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?;
if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? {
debug!("SetupNextPath {:?}", path);
self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into());
self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?);
} else {
self.steps = SetupNextPath;
};
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NEXTPATH"))), self)))
} else {
self.steps = Done;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DONE"))), self)))
}
}
ReadBlocks(ref mut iter, ref hver, ref indexpath) => {
// TODO stats
let stats = &StatsChannel::dummy();
// TODO I need to keep some datafile open.
let item = if let Some(rec) = iter.next().await? {
// TODO the iterator should actually return Dataref. We never expect child nodes here.
if let RecordTarget::Dataref(dp) = rec.target {
let f1 = self.file1.as_mut().unwrap();
let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?;
let dpath = indexpath.parent().unwrap().join(dref.file_name());
// TODO Remember the index path, need it here for relative path.
// TODO open datafile, relative path to index path.
// TODO keep open when path does not change.
let acc;
let num_samples;
if false {
if let Some(_) = self.dfnotfound.get(dref.file_name()) {
num_samples = 0;
acc = 1;
} else {
if dref.file_name() == self.last_f2 {
acc = 2;
} else {
match open_read(dpath.clone(), stats).await {
Ok(f2) => {
acc = 4;
self.file2 = Some(
RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy())
.await?,
);
self.last_f2 = dref.file_name().into();
}
Err(_) => {
acc = 3;
self.file2 = None;
}
}
};
if let Some(f2) = self.file2.as_mut() {
if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos {
num_samples = 0;
} else {
self.last_dfhpos = dref.data_header_pos();
let rp1 = f2.rp_abs();
let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?;
let data = read_data2(f2, &dfheader, self.range.clone(), false).await?;
let rp2 = f2.rp_abs();
self.data_bytes_read += rp2 - rp1;
num_samples = dfheader.num_samples;
if data.len() != num_samples as usize {
if (data.len() as i64 - num_samples as i64).abs() < 4 {
// TODO get always one event less than num_samples tells us.
//warn!("small deviation {} vs {}", data.len(), num_samples);
} else {
return Err(Error::with_msg_no_trace(format!(
"event count mismatch {} vs {}",
data.len(),
num_samples
)));
}
}
}
} else {
self.dfnotfound.insert(dref.file_name().into(), true);
num_samples = 0;
};
}
} else {
acc = 6;
num_samples = 0;
}
let jsval = serde_json::to_value((
dp.0,
dp.0 as i64 - self.last_dp as i64,
dref.file_name(),
dref.data_header_pos.0,
dref.data_header_pos.0 as i64 - self.last_dp2 as i64,
dref.next().0,
acc,
num_samples,
))?;
self.last_dp = dp.0;
self.last_dp2 = dref.data_header_pos.0;
let bref = Blockref { dref, dpath };
BlockrefItem::Blockref(bref, jsval)
} else {
panic!();
}
} else {
info!(
"data_bytes_read: {} same_dfh_count: {}",
self.data_bytes_read, self.same_dfh_count
);
self.steps = SetupNextPath;
BlockrefItem::JsVal(JsVal::String(format!("NOMORE")))
};
Ok(Some((item, self)))
}
Done => Ok(None),
}
}
}
impl UnfoldExec for BlockrefStream {
type Output = BlockrefItem;
fn exec(self) -> Pin<Box<dyn Future<Output = Result<Option<(Self::Output, Self)>, Error>> + Send>>
where
Self: Sized,
{
Box::pin(self.exec())
}
}
pub fn blockref_stream(
channel: Channel,
range: NanoRange,
conf: ChannelArchiver,
) -> impl Stream<Item = Result<BlockrefItem, Error>> {
unfold_stream(BlockrefStream::new(channel, range, conf.clone()))
}

View File

@@ -1,226 +1,369 @@
use crate::archeng::backreadbuf::BackReadBuf;
use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2};
use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec};
use crate::archeng::indextree::{
read_datablockref, read_datablockref2, DataheaderPos, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget,
};
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{open_read, seek, StatsChannel};
use super::indextree::DataheaderPos;
use super::ringbuf::RingBuf;
use super::{open_read, StatsChannel};
use crate::archeng::blockrefstream::BlockrefItem;
use crate::archeng::datablock::{read_data2, read_datafile_header2};
use crate::eventsitem::EventsItem;
use err::Error;
use futures_core::{Future, Stream};
use items::WithLen;
#[allow(unused)]
use netpod::log::*;
use netpod::{Channel, ChannelArchiver, FilePos, NanoRange};
use futures_util::stream::FuturesOrdered;
use futures_util::StreamExt;
use items::{WithLen, WithTimestamps};
use netpod::{log::*, NanoRange};
use serde::Serialize;
use serde_json::Value as JsVal;
use std::collections::{BTreeMap, VecDeque};
use std::io::SeekFrom;
use std::fmt;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::fs::File;
enum Steps {
Start,
SelectIndexFile,
SetupNextPath,
ReadBlocks(RecordIter, Box<dyn HeaderVersion>, PathBuf),
#[derive(Debug, Serialize)]
pub struct StatsAcc {
items: u64,
events: u64,
bytes: u64,
#[serde(skip)]
beg: Instant,
}
impl StatsAcc {
pub fn new() -> Self {
Self {
items: 0,
events: 0,
bytes: 0,
beg: Instant::now(),
}
}
fn add(&mut self, events: u64, bytes: u64) {
self.items += 1;
self.events += events;
self.bytes += bytes;
}
fn older(&self, dur: Duration) -> bool {
Instant::now().duration_since(self.beg) >= dur
}
}
struct Reader {
fname: String,
rb: RingBuf<File>,
}
impl Reader {}
struct FutAItem {
fname: String,
path: PathBuf,
dfnotfound: bool,
reader: Option<Reader>,
bytes_read: u64,
events_read: u64,
events: Option<EventsItem>,
}
pub struct FutA {
fname: String,
pos: DataheaderPos,
reader: Option<Reader>,
}
impl Future for FutA {
type Output = Result<JsVal, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
err::todoval()
}
}
pub enum BlockItem {}
pub struct BlockStream<S> {
inp: S,
inp_done: bool,
range: NanoRange,
dfnotfound: BTreeMap<PathBuf, bool>,
block_reads: FuturesOrdered<Pin<Box<dyn Future<Output = Result<FutAItem, Error>> + Send>>>,
max_reads: usize,
readers: VecDeque<Reader>,
last_dfname: String,
last_dfhpos: DataheaderPos,
ts_max: u64,
done: bool,
complete: bool,
acc: StatsAcc,
good_reader: u64,
discard_reader: u64,
not_found_hit: u64,
same_block: u64,
}
impl<S> BlockStream<S> {
pub fn new(inp: S, range: NanoRange, max_reads: usize) -> Self
where
S: Stream<Item = Result<BlockrefItem, Error>> + Unpin,
{
Self {
inp,
inp_done: false,
range,
dfnotfound: BTreeMap::new(),
block_reads: FuturesOrdered::new(),
max_reads,
readers: VecDeque::new(),
last_dfname: String::new(),
last_dfhpos: DataheaderPos(u64::MAX),
ts_max: 0,
done: false,
complete: false,
acc: StatsAcc::new(),
good_reader: 0,
discard_reader: 0,
not_found_hit: 0,
same_block: 0,
}
}
}
enum Int<T> {
NoWork,
Pending,
Empty,
Item(T),
Done,
}
struct DataBlocks {
conf: ChannelArchiver,
channel: Channel,
range: NanoRange,
steps: Steps,
paths: VecDeque<String>,
file1: Option<BackReadBuf<File>>,
file2: Option<RingBuf<File>>,
last_dp: u64,
last_dp2: u64,
last_f2: String,
last_dfhpos: DataheaderPos,
dfnotfound: BTreeMap<String, bool>,
data_bytes_read: u64,
same_dfh_count: u64,
}
impl<S> Stream for BlockStream<S>
where
S: Stream<Item = Result<BlockrefItem, Error>> + Unpin,
{
type Item = Result<JsVal, Error>;
impl DataBlocks {
fn new(channel: Channel, range: NanoRange, conf: ChannelArchiver) -> Self {
Self {
conf,
channel,
range,
steps: Steps::Start,
paths: VecDeque::new(),
file1: None,
file2: None,
last_dp: 0,
last_dp2: 0,
last_f2: String::new(),
last_dfhpos: DataheaderPos(u64::MAX),
dfnotfound: BTreeMap::new(),
data_bytes_read: 0,
same_dfh_count: 0,
}
}
async fn exec(mut self) -> Result<Option<(JsVal, Self)>, Error> {
use Steps::*;
match self.steps {
Start => {
self.steps = SelectIndexFile;
Ok(Some((JsVal::Null, self)))
}
SelectIndexFile => {
let dbc = database_connect(&self.conf.database).await?;
let sql = "select path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index";
let rows = dbc.query(sql, &[&self.channel.name()]).await?;
for row in rows {
self.paths.push_back(row.try_get(0)?);
}
self.steps = SetupNextPath;
Ok(Some((JsVal::String(format!("INIT")), self)))
}
SetupNextPath => {
let stats = &StatsChannel::dummy();
// For simplicity, simply read all storage classes linearly.
if let Some(path) = self.paths.pop_front() {
// TODO
let mut file = open_read(path.clone().into(), stats).await?;
let basics = IndexFileBasics::from_file(&path, &mut file, stats).await?;
let mut tree = basics
.rtree_for_channel(self.channel.name(), stats)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not in index files"))?;
if let Some(iter) = tree.iter_range(self.range.clone(), stats).await? {
debug!("SetupNextPath {:?}", path);
self.steps = ReadBlocks(iter, basics.hver().duplicate(), path.clone().into());
self.file1 = Some(BackReadBuf::new(file, 0, stats.clone()).await?);
} else {
self.steps = SetupNextPath;
};
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
Ready(None)
} else {
let item1 = if self.inp_done {
Int::Done
} else if self.block_reads.len() >= self.max_reads {
Int::NoWork
} else {
self.steps = Done;
}
Ok(Some((JsVal::String(format!("NEXTPATH")), self)))
}
ReadBlocks(ref mut iter, ref hver, ref indexpath) => {
// TODO stats
let stats = &StatsChannel::dummy();
// TODO I need to keep some datafile open.
let item = if let Some(rec) = iter.next().await? {
// TODO the iterator should actually return Dataref. We never expect child nodes here.
if let RecordTarget::Dataref(dp) = rec.target {
let f1 = self.file1.as_mut().unwrap();
let dref = read_datablockref2(f1, dp.clone(), hver.as_ref()).await?;
// TODO Remember the index path, need it here for relative path.
// TODO open datafile, relative path to index path.
// TODO keep open when path does not change.
let acc;
let num_samples;
if true {
if let Some(_) = self.dfnotfound.get(dref.file_name()) {
num_samples = 0;
acc = 1;
} else {
if dref.file_name() == self.last_f2 {
acc = 2;
} else {
let dpath = indexpath.parent().unwrap().join(dref.file_name());
match open_read(dpath, stats).await {
Ok(f2) => {
acc = 4;
self.file2 = Some(
RingBuf::new(f2, dref.data_header_pos().0, StatsChannel::dummy())
.await?,
);
self.last_f2 = dref.file_name().into();
}
Err(_) => {
acc = 3;
self.file2 = None;
}
}
};
if let Some(f2) = self.file2.as_mut() {
if dref.file_name() == self.last_f2 && dref.data_header_pos() == self.last_dfhpos {
num_samples = 0;
} else {
self.last_dfhpos = dref.data_header_pos();
let rp1 = f2.rp_abs();
let dfheader = read_datafile_header2(f2, dref.data_header_pos()).await?;
let data = read_data2(f2, &dfheader, self.range.clone(), false).await?;
let rp2 = f2.rp_abs();
self.data_bytes_read += rp2 - rp1;
num_samples = dfheader.num_samples;
if data.len() != num_samples as usize {
if (data.len() as i64 - num_samples as i64).abs() < 4 {
// TODO get always one event less than num_samples tells us.
//warn!("small deviation {} vs {}", data.len(), num_samples);
match self.inp.poll_next_unpin(cx) {
Ready(item) => match item {
Some(item) => match item {
Ok(item) => match item {
BlockrefItem::Blockref(bref, _jsval) => {
if let Some(_) = self.dfnotfound.get(&bref.dpath) {
self.not_found_hit += 1;
} else {
if bref.dref.file_name() == self.last_dfname
&& bref.dref.data_header_pos() == self.last_dfhpos
{
self.same_block += 1;
} else {
return Err(Error::with_msg_no_trace(format!(
"event count mismatch {} vs {}",
data.len(),
num_samples
)));
}
let reader = if let Some(reader) = self.readers.pop_front() {
if reader.fname == bref.dref.file_name() {
self.good_reader += 1;
Some(reader)
} else {
self.discard_reader += 1;
None
}
} else {
None
};
let fname = bref.dref.file_name().to_string();
let dpath = bref.dpath;
let pos = bref.dref.data_header_pos();
let fut = {
let fname = fname.clone();
let pos = pos.clone();
let range = self.range.clone();
async move {
let reader = if let Some(reader) = reader {
Some(reader)
} else {
let stats = StatsChannel::dummy();
info!("open new reader file {:?}", dpath);
match open_read(dpath.clone(), &stats).await {
Ok(file) => {
//
let reader = Reader {
fname: fname.clone(),
rb: RingBuf::new(file, pos.0, stats).await?,
};
Some(reader)
}
Err(_) => None,
}
};
if let Some(mut reader) = reader {
let rp1 = reader.rb.bytes_read();
let dfheader =
read_datafile_header2(&mut reader.rb, pos).await?;
let data =
read_data2(&mut reader.rb, &dfheader, range, false)
.await?;
let rp2 = reader.rb.bytes_read();
let bytes_read = rp2 - rp1;
let ret = FutAItem {
fname,
path: dpath,
dfnotfound: false,
reader: Some(reader),
bytes_read,
events_read: data.len() as u64,
events: Some(data),
};
Ok(ret)
} else {
let ret = FutAItem {
fname,
path: dpath,
dfnotfound: true,
reader: None,
bytes_read: 0,
events_read: 0,
events: None,
};
Ok(ret)
}
}
};
self.block_reads.push(Box::pin(fut));
self.last_dfname = fname;
self.last_dfhpos = pos;
};
}
Int::Empty
}
BlockrefItem::JsVal(_jsval) => Int::Empty,
},
Err(e) => {
self.done = true;
Int::Item(Err(e))
}
},
None => {
self.inp_done = true;
Int::Done
}
},
Pending => Int::Pending,
}
};
let item2 = if let Int::Item(_) = item1 {
Int::NoWork
} else {
if self.block_reads.len() == 0 {
Int::NoWork
} else {
match self.block_reads.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => {
//
if item.dfnotfound {
self.dfnotfound.insert(item.path, true);
}
if let Some(reader) = item.reader {
self.readers.push_back(reader);
}
if let Some(ev) = &item.events {
for i in 0..ev.len() {
let ts = ev.ts(i);
if ts < self.ts_max {
let msg = format!("unordered event: {} {}", ts, self.ts_max);
error!("{}", msg);
self.done = true;
return Ready(Some(Err(Error::with_msg_no_trace(msg))));
}
}
}
self.acc.add(item.events_read, item.bytes_read);
if false {
let item = JsVal::String(format!(
"bytes read {} {} events {}",
item.bytes_read,
item.events.is_some(),
item.events_read
));
}
if self.acc.older(Duration::from_millis(1000)) {
let ret = std::mem::replace(&mut self.acc, StatsAcc::new());
match serde_json::to_value((ret, self.block_reads.len(), self.readers.len())) {
Ok(item) => Int::Item(Ok(item)),
Err(e) => {
self.done = true;
return Ready(Some(Err(e.into())));
}
}
} else {
self.dfnotfound.insert(dref.file_name().into(), true);
num_samples = 0;
};
//Int::Item(Ok(item))
Int::Empty
}
}
} else {
acc = 6;
num_samples = 0;
Ready(Some(Err(e))) => {
self.done = true;
Int::Item(Err(e))
}
Ready(None) => {
panic!();
}
Pending => Int::Pending,
}
let item = serde_json::to_value((
dp.0,
dp.0 as i64 - self.last_dp as i64,
dref.file_name(),
dref.data_header_pos.0,
dref.data_header_pos.0 as i64 - self.last_dp2 as i64,
dref.next().0,
acc,
num_samples,
))?;
self.last_dp = dp.0;
self.last_dp2 = dref.data_header_pos.0;
item
} else {
panic!();
}
} else {
info!(
"data_bytes_read: {} same_dfh_count: {}",
self.data_bytes_read, self.same_dfh_count
);
self.steps = SetupNextPath;
JsVal::String(format!("NOMORE"))
};
Ok(Some((item, self)))
}
Done => Ok(None),
match (item1, item2) {
(Int::Item(_), Int::Item(_)) => panic!(),
(Int::NoWork, Int::NoWork) => panic!(),
(_, Int::Done) => panic!(),
(Int::Item(item), _) => Ready(Some(item)),
(_, Int::Item(item)) => Ready(Some(item)),
(Int::Pending | Int::NoWork, Int::Pending) => Pending,
(Int::Pending, Int::NoWork) => Pending,
(Int::Done, Int::Pending) => Pending,
(Int::Pending | Int::Done | Int::Empty | Int::NoWork, Int::Empty) => continue,
(Int::Empty, Int::Pending | Int::NoWork) => continue,
(Int::Done, Int::NoWork) => {
self.done = true;
Ready(None)
}
}
};
}
}
}
impl UnfoldExec for DataBlocks {
type Output = JsVal;
fn exec(self) -> Pin<Box<dyn Future<Output = Result<Option<(Self::Output, Self)>, Error>> + Send>>
where
Self: Sized,
{
Box::pin(self.exec())
impl<S> fmt::Debug for BlockStream<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockStream")
.field("inp_done", &self.inp_done)
.field("range", &self.range)
.field("max_reads", &self.max_reads)
.field("ts_max", &self.ts_max)
.field("done", &self.done)
.field("complete", &self.complete)
.field("acc", &self.acc)
.field("good_reader", &self.good_reader)
.field("discard_reader", &self.discard_reader)
.field("not_found_hit", &self.not_found_hit)
.field("same_block", &self.same_block)
.finish()
}
}
pub fn blockstream(
channel: Channel,
range: NanoRange,
conf: ChannelArchiver,
) -> impl Stream<Item = Result<JsVal, Error>> {
unfold_stream(DataBlocks::new(channel, range, conf.clone()))
impl<S> Drop for BlockStream<S> {
fn drop(&mut self) {
info!("Drop {:?}", self);
}
}

View File

@@ -251,7 +251,7 @@ pub async fn read_data2(
}
}
rb.adv(n3);
info!("parsed block with {} / {} events", ntot, evs.tss.len());
//info!("parsed block with {} / {} events", ntot, evs.tss.len());
let evs = ScalarPlainEvents::Double(evs);
let plain = PlainEvents::Scalar(evs);
let item = EventsItem::Plain(plain);

View File

@@ -16,6 +16,9 @@ pub struct RingBuf<F> {
seek_request: u64,
seek_done: u64,
read_done: u64,
small_pos: u64,
small_neg: u64,
bytes_read: u64,
}
impl<F> RingBuf<F>
@@ -33,6 +36,9 @@ where
seek_request: 0,
seek_done: 0,
read_done: 0,
small_pos: 0,
small_neg: 0,
bytes_read: 0,
};
ret.seek(pos).await?;
Ok(ret)
@@ -67,14 +73,16 @@ where
self.rp = 0;
}
}
let max = (self.buf.len() - self.wp).min(1024 * 8) + self.wp;
let n = read(
self.file.as_mut().unwrap().borrow_mut(),
&mut self.buf[self.wp..],
&mut self.buf[self.wp..max],
&self.stats,
)
.await?;
self.wp += n;
self.read_done += 1;
self.bytes_read += n as u64;
Ok(n)
}
@@ -90,11 +98,13 @@ where
}
pub async fn seek(&mut self, pos: u64) -> Result<u64, Error> {
let dp = pos as i64 - self.abs as i64 - self.rp as i64;
let dp = pos as i64 - self.rp_abs() as i64;
if dp < 0 && dp > -2048 {
debug!("small NEG seek {}", dp);
} else if dp == 0 {
debug!("zero seek");
// TODO check callsites, some cases could be eliminated.
//debug!("zero seek");
return Ok(pos);
} else if dp > 0 && dp < 2048 {
debug!("small POS seek {}", dp);
}
@@ -116,6 +126,10 @@ where
pub fn rp_abs(&self) -> u64 {
self.abs as u64 + self.rp as u64
}
pub fn bytes_read(&self) -> u64 {
self.bytes_read
}
}
impl<F> fmt::Debug for RingBuf<F> {
@@ -124,12 +138,18 @@ impl<F> fmt::Debug for RingBuf<F> {
.field("abs", &self.abs)
.field("wp", &self.wp)
.field("rp", &self.rp)
.field("seek_request", &self.seek_request)
.field("seek_done", &self.seek_done)
.field("read_done", &self.read_done)
.field("small_pos", &self.small_pos)
.field("small_neg", &self.small_neg)
.field("bytes_read", &self.bytes_read)
.finish()
}
}
impl<F> Drop for RingBuf<F> {
fn drop(&mut self) {
info!("RingBuf Drop {} {}", self.seek_request, self.read_done);
info!("Drop {:?}", self);
}
}

View File

@@ -1,12 +1,15 @@
use crate::response;
use disk::events::PlainEventsJsonQuery;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use http::{header, Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::{log::*, Channel, NanoRange};
use netpod::query::RawEventsQuery;
use netpod::{get_url_query_pairs, log::*, Channel, NanoRange};
use netpod::{NodeConfigCached, APP_JSON_LINES};
use serde::Serialize;
use url::Url;
fn json_lines_stream<S, I>(stream: S) -> impl Stream<Item = Result<Vec<u8>, Error>>
where
@@ -155,6 +158,67 @@ impl ScanChannels {
}
}
pub struct BlockRefStream {}
impl BlockRefStream {
pub fn prefix() -> &'static str {
"/api/4/channelarchiver/blockrefstream"
}
pub fn name() -> &'static str {
"BlockRefStream"
}
pub fn should_handle(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
info!("{} handle uri: {:?}", Self::name(), req.uri());
let conf = node_config
.node
.channel_archiver
.as_ref()
.ok_or(Error::with_msg_no_trace(
"this node is not configured as channel archiver",
))?;
let range = NanoRange { beg: 0, end: u64::MAX };
let channel = Channel {
backend: "".into(),
name: "ARIDI-PCT:CURRENT".into(),
};
let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range, conf.clone());
let s = s.map(|item| match item {
Ok(item) => {
use archapp_wrap::archapp::archeng::blockrefstream::BlockrefItem::*;
match item {
Blockref(_k, jsval) => Ok(jsval),
JsVal(jsval) => Ok(jsval),
}
}
Err(e) => Err(e),
});
let s = json_lines_stream(s);
let s = s.map(|item| match item {
Ok(k) => Ok(k),
Err(e) => {
error!("observe error: {}", e);
Err(e)
}
});
Ok(response(StatusCode::OK)
.header(header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(s))?)
}
}
pub struct BlockStream {}
impl BlockStream {
@@ -191,7 +255,19 @@ impl BlockStream {
backend: "".into(),
name: "ARIDI-PCT:CURRENT".into(),
};
let s = archapp_wrap::archapp::archeng::blockstream::blockstream(channel, range, conf.clone());
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?;
let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone());
let s = Box::pin(s);
let s = archapp_wrap::archapp::archeng::blockstream::BlockStream::new(s, range.clone(), read_queue);
let s = s.map(|item| match item {
Ok(item) => {
//use archapp_wrap::archapp::archeng::blockstream::BlockItem::*;
Ok(item)
}
Err(e) => Err(e),
});
let s = json_lines_stream(s);
let s = s.map(|item| match item {
Ok(k) => Ok(k),

View File

@@ -290,6 +290,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockRefStream::should_handle(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockStream::should_handle(path) {
h.handle(req, &node_config).await
} else if path.starts_with("/api/1/requestStatus/") {