List indexfiles in directories

This commit is contained in:
Dominik Werder
2021-10-26 18:11:10 +02:00
parent ade01fb3e2
commit ee376dbd93
25 changed files with 2739 additions and 1557 deletions

View File

@@ -1,17 +1,21 @@
use crate::archeng::{
open_read, read_channel, read_data_1, read_datafile_header, read_index_datablockref, search_record,
index_file_path_list, open_read, read_channel, read_data_1, read_datafile_header, read_index_datablockref,
search_record, search_record_expand, StatsChannel,
};
use crate::EventsItem;
use crate::eventsitem::EventsItem;
use crate::storagemerge::StorageMerge;
use crate::timed::Timed;
use async_channel::{Receiver, Sender};
use err::Error;
use futures_core::{Future, Stream};
use futures_util::{FutureExt, StreamExt};
use items::{RangeCompletableItem, Sitemty, StreamItem, WithLen};
use items::{inspect_timestamps, RangeCompletableItem, Sitemty, StreamItem, WithLen};
use netpod::{log::*, DataHeaderPos, FilePos, Nanos};
use netpod::{Channel, NanoRange};
use std::collections::VecDeque;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
type FR = (Option<Sitemty<EventsItem>>, Box<dyn FretCb>);
@@ -20,77 +24,133 @@ trait FretCb {
fn call(&mut self, stream: &mut Pin<&mut DatablockStream>);
}
async fn datablock_stream(
range: NanoRange,
channel: Channel,
base_dirs: VecDeque<PathBuf>,
expand: bool,
tx: Sender<Sitemty<EventsItem>>,
) {
match datablock_stream_inner(range, channel, base_dirs, expand, tx.clone()).await {
Ok(_) => {}
Err(e) => match tx.send(Err(e)).await {
Ok(_) => {}
Err(e) => {
if false {
error!("can not send. error: {}", e);
}
}
},
static CHANNEL_SEND_ERROR: AtomicUsize = AtomicUsize::new(0);
fn channel_send_error() {
let c = CHANNEL_SEND_ERROR.fetch_add(1, Ordering::AcqRel);
if c < 10 {
error!("CHANNEL_SEND_ERROR {}", c);
}
}
async fn datablock_stream_inner(
async fn datablock_stream(
range: NanoRange,
channel: Channel,
base_dirs: VecDeque<PathBuf>,
index_files_index_path: PathBuf,
_base_dirs: VecDeque<PathBuf>,
expand: bool,
tx: Sender<Sitemty<EventsItem>>,
max_events: u64,
) {
match datablock_stream_inner(range, channel, expand, index_files_index_path, tx.clone(), max_events).await {
Ok(_) => {}
Err(e) => {
if let Err(_) = tx.send(Err(e)).await {
channel_send_error();
}
}
}
}
async fn datablock_stream_inner_single_index(
range: NanoRange,
channel: Channel,
index_path: PathBuf,
expand: bool,
tx: Sender<Sitemty<EventsItem>>,
max_events: u64,
) -> Result<(), Error> {
let basename = channel
.name()
.split("-")
.next()
.ok_or(Error::with_msg_no_trace("can not find base for channel"))?;
for base in base_dirs {
debug!(
"search for {:?} with basename: {} in path {:?}",
channel, basename, base
);
// TODO need to try both:
let index_path = base.join(format!("archive_{}_SH", basename)).join("index");
let res = open_read(index_path.clone()).await;
debug!("tried to open index file: {:?}", res);
if let Ok(mut index_file) = res {
if let Some(basics) = read_channel(&mut index_file, channel.name()).await? {
let mut events_tot = 0;
let stats = &StatsChannel::new(tx.clone());
debug!("try to open index file: {:?}", index_path);
let res = open_read(index_path.clone(), stats).await;
debug!("opened index file: {:?} {:?}", index_path, res);
match res {
Ok(mut index_file) => {
if let Some(basics) = read_channel(&mut index_file, channel.name(), stats).await? {
let beg = Nanos { ns: range.beg };
let mut expand_beg = expand;
let mut index_ts_max = 0;
let mut search_ts = beg.clone();
let mut last_data_file_path = PathBuf::new();
let mut last_data_file_pos = DataHeaderPos(0);
loop {
// TODO for expand mode, this needs another search function.
let (res, _stats) =
search_record(&mut index_file, basics.rtree_m, basics.rtree_start_pos, search_ts).await?;
let timed_search = Timed::new("search next record");
let (res, _stats) = if expand_beg {
// TODO even though this is an entry in the index, it may reference
// non-existent blocks.
// Therefore, lower expand_beg flag at some later stage only if we've really
// found at least one event in the block.
expand_beg = false;
search_record_expand(
&mut index_file,
basics.rtree_m,
basics.rtree_start_pos,
search_ts,
stats,
)
.await?
} else {
search_record(
&mut index_file,
basics.rtree_m,
basics.rtree_start_pos,
search_ts,
stats,
)
.await?
};
drop(timed_search);
if let Some(nrec) = res {
let rec = nrec.rec();
trace!("found record: {:?}", rec);
let pos = FilePos { pos: rec.child_or_id };
// TODO rename Datablock? → IndexNodeDatablock
trace!("READ Datablock FROM {:?}\n", pos);
let datablock = read_index_datablockref(&mut index_file, pos).await?;
let datablock = read_index_datablockref(&mut index_file, pos, stats).await?;
trace!("Datablock: {:?}\n", datablock);
let data_path = index_path.parent().unwrap().join(datablock.file_name());
if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos {
debug!("skipping because it is the same block");
} else {
trace!("try to open data_path: {:?}", data_path);
match open_read(data_path.clone()).await {
match open_read(data_path.clone(), stats).await {
Ok(mut data_file) => {
let datafile_header =
read_datafile_header(&mut data_file, datablock.data_header_pos()).await?;
read_datafile_header(&mut data_file, datablock.data_header_pos(), stats)
.await?;
trace!("datafile_header -------------- HEADER\n{:?}", datafile_header);
let events = read_data_1(&mut data_file, &datafile_header).await?;
let events =
read_data_1(&mut data_file, &datafile_header, range.clone(), expand_beg, stats)
.await?;
if false {
let msg = inspect_timestamps(&events, range.clone());
trace!("datablock_stream_inner_single_index read_data_1\n{}", msg);
}
{
let mut ts_max = 0;
use items::WithTimestamps;
for i in 0..events.len() {
let ts = events.ts(i);
if ts < ts_max {
error!("unordered event within block at ts {}", ts);
break;
} else {
ts_max = ts;
}
if ts < index_ts_max {
error!(
"unordered event in index branch ts {} index_ts_max {}",
ts, index_ts_max
);
break;
} else {
index_ts_max = ts;
}
}
}
trace!("Was able to read data: {} events", events.len());
events_tot += events.len() as u64;
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events)));
tx.send(item).await?;
}
@@ -111,12 +171,59 @@ async fn datablock_stream_inner(
warn!("nothing found, break");
break;
}
if events_tot >= max_events {
warn!("reached events_tot {} max_events {}", events_tot, max_events);
break;
}
}
} else {
warn!("can not read channel basics from {:?}", index_path);
}
} else {
Ok(())
}
Err(e) => {
warn!("can not find index file at {:?}", index_path);
Err(Error::with_msg_no_trace(format!("can not open index file: {}", e)))
}
}
}
async fn datablock_stream_inner(
range: NanoRange,
channel: Channel,
expand: bool,
index_files_index_path: PathBuf,
tx: Sender<Sitemty<EventsItem>>,
max_events: u64,
) -> Result<(), Error> {
let stats = &StatsChannel::new(tx.clone());
let index_file_path_list = index_file_path_list(channel.clone(), index_files_index_path, stats).await?;
let mut inner_rxs = vec![];
let mut names = vec![];
for index_path in index_file_path_list {
let (tx, rx) = async_channel::bounded(2);
let task = datablock_stream_inner_single_index(
range.clone(),
channel.clone(),
(&index_path).into(),
expand,
tx,
max_events,
);
taskrun::spawn(task);
inner_rxs.push(Box::pin(rx) as Pin<Box<dyn Stream<Item = Sitemty<EventsItem>> + Send>>);
names.push(index_path.to_str().unwrap().into());
}
let task = async move {
let mut inp = StorageMerge::new(inner_rxs, names, range.clone());
while let Some(k) = inp.next().await {
if let Err(_) = tx.send(k).await {
channel_send_error();
break;
}
}
};
taskrun::spawn(task);
Ok(())
}
@@ -132,14 +239,22 @@ pub struct DatablockStream {
}
impl DatablockStream {
pub fn for_channel_range(range: NanoRange, channel: Channel, base_dirs: VecDeque<PathBuf>, expand: bool) -> Self {
pub fn for_channel_range(
range: NanoRange,
channel: Channel,
base_dirs: VecDeque<PathBuf>,
expand: bool,
max_events: u64,
) -> Self {
let (tx, rx) = async_channel::bounded(1);
taskrun::spawn(datablock_stream(
range.clone(),
channel.clone(),
"/index/c5mapped".into(),
base_dirs.clone(),
expand.clone(),
tx,
max_events,
));
let ret = Self {
range,
@@ -151,6 +266,10 @@ impl DatablockStream {
done: false,
complete: false,
};
// TODO keeping for compatibility at the moment:
let _ = &ret.range;
let _ = &ret.channel;
let _ = &ret.expand;
ret
}
@@ -169,7 +288,7 @@ impl DatablockStream {
(None, Box::new(Cb {}))
}
async fn start_with_base_dir(path: PathBuf) -> FR {
async fn start_with_base_dir(_path: PathBuf) -> FR {
warn!("start_with_base_dir");
struct Cb {}
impl FretCb for Cb {
@@ -199,9 +318,7 @@ impl Stream for DatablockStream {
} else if self.done {
self.complete = true;
Ready(None)
} else if true {
self.rx.poll_next_unpin(cx)
} else {
} else if false {
match self.fut.poll_unpin(cx) {
Ready((k, mut fr)) => {
fr.call(&mut self);
@@ -212,6 +329,8 @@ impl Stream for DatablockStream {
}
Pending => Pending,
}
} else {
self.rx.poll_next_unpin(cx)
};
}
}
@@ -219,9 +338,8 @@ impl Stream for DatablockStream {
#[cfg(test)]
mod test {
use crate::EventsItem;
use super::DatablockStream;
use crate::eventsitem::EventsItem;
use chrono::{DateTime, Utc};
use err::Error;
use futures_util::StreamExt;
@@ -258,7 +376,7 @@ mod test {
.map(PathBuf::from)
.collect();
let expand = false;
let datablocks = DatablockStream::for_channel_range(range.clone(), channel, base_dirs, expand);
let datablocks = DatablockStream::for_channel_range(range.clone(), channel, base_dirs, expand, u64::MAX);
let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand);
let mut stream = filtered;
while let Some(block) = stream.next().await {

View File

@@ -1,17 +0,0 @@
use crate::EventsItem;
use futures_core::Stream;
use items::Sitemty;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct DataStream {}
impl Stream for DataStream {
type Item = Sitemty<EventsItem>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let _ = self;
let _ = cx;
todo!()
}
}

View File

@@ -0,0 +1,312 @@
use crate::wrap_task;
use async_channel::Receiver;
use err::Error;
use futures_core::Future;
use futures_core::Stream;
use futures_util::stream::unfold;
use futures_util::FutureExt;
use netpod::log::*;
use netpod::ChannelArchiver;
use netpod::Database;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::read_dir;
use tokio_postgres::Client as PgClient;
pub fn list_index_files(node: &ChannelArchiver) -> Receiver<Result<PathBuf, Error>> {
let node = node.clone();
let (tx, rx) = async_channel::bounded(4);
let tx2 = tx.clone();
let task = async move {
for bp in &node.data_base_paths {
let mut rd = read_dir(bp).await?;
while let Some(e) = rd.next_entry().await? {
let ft = e.file_type().await?;
if ft.is_dir() {
let mut rd = read_dir(e.path()).await?;
while let Some(e) = rd.next_entry().await? {
let ft = e.file_type().await?;
if false && ft.is_dir() {
let mut rd = read_dir(e.path()).await?;
while let Some(e) = rd.next_entry().await? {
let ft = e.file_type().await?;
if ft.is_file() {
if e.file_name().to_string_lossy() == "index" {
tx.send(Ok(e.path())).await?;
}
}
}
} else if ft.is_file() {
if e.file_name().to_string_lossy() == "index" {
tx.send(Ok(e.path())).await?;
}
}
}
} else if ft.is_file() {
if e.file_name().to_string_lossy() == "index" {
tx.send(Ok(e.path())).await?;
}
}
}
}
Ok::<_, Error>(())
};
wrap_task(task, tx2);
rx
}
pub struct ScanIndexFiles0 {}
impl Stream for ScanIndexFiles0 {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let _ = cx;
todo!()
}
}
pub async fn get_level_0(conf: ChannelArchiver) -> Result<Vec<PathBuf>, Error> {
let mut ret = vec![];
for bp in &conf.data_base_paths {
let mut rd = read_dir(bp).await?;
while let Some(e) = rd.next_entry().await? {
if e.file_name().to_string_lossy().contains("index") {
warn!("Top-level data path contains `index` entry");
}
let ft = e.file_type().await?;
if ft.is_dir() {
ret.push(e.path());
}
}
}
Ok(ret)
}
pub async fn get_level_1(lev0: Vec<PathBuf>) -> Result<Vec<PathBuf>, Error> {
let mut ret = vec![];
for bp in lev0 {
let mut rd = read_dir(bp).await?;
while let Some(e) = rd.next_entry().await? {
let ft = e.file_type().await?;
if ft.is_file() {
if e.file_name().to_string_lossy() == "index" {
ret.push(e.path());
}
}
}
}
Ok(ret)
}
pub async fn database_connect(db_config: &Database) -> Result<PgClient, Error> {
let d = db_config;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);
let (cl, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await?;
// TODO monitor connection drop.
let _cjh = tokio::spawn(async move {
if let Err(e) = conn.await {
error!("connection error: {}", e);
}
Ok::<_, Error>(())
});
Ok(cl)
}
pub trait UnfoldExec {
type Output: Send;
fn exec(self) -> Pin<Box<dyn Future<Output = Result<Option<(Self::Output, Self)>, Error>> + Send>>
where
Self: Sized;
}
pub fn unfold_stream<St, T>(st: St) -> impl Stream<Item = Result<T, Error>>
where
St: UnfoldExec<Output = T> + Send,
T: Send,
{
enum UnfoldState<St> {
Running(St),
Done,
}
unfold(UnfoldState::Running(st), |st| async move {
match st {
UnfoldState::Running(st) => match st.exec().await {
Ok(Some((item, st))) => Some((Ok(item), UnfoldState::Running(st))),
Ok(None) => None,
Err(e) => Some((Err(e), UnfoldState::Done)),
},
UnfoldState::Done => None,
}
})
}
enum ScanIndexFilesSteps {
Level0,
Level1(Vec<PathBuf>),
Done,
}
struct ScanIndexFiles {
conf: ChannelArchiver,
steps: ScanIndexFilesSteps,
}
impl ScanIndexFiles {
fn new(conf: ChannelArchiver) -> Self {
Self {
conf,
steps: ScanIndexFilesSteps::Level0,
}
}
async fn exec(mut self) -> Result<Option<(String, Self)>, Error> {
match self.steps {
ScanIndexFilesSteps::Level0 => {
let res = get_level_0(self.conf.clone()).await?;
self.steps = ScanIndexFilesSteps::Level1(res);
let item = format!("level 0 done");
Ok(Some((item, self)))
}
ScanIndexFilesSteps::Level1(paths) => {
let paths = get_level_1(paths).await?;
info!("collected {} level 1 paths", paths.len());
let dbc = database_connect(&self.conf.database).await?;
for p in paths {
let sql = "insert into indexfiles (path) values ($1) on conflict do nothing";
dbc.query(sql, &[&p.to_string_lossy()]).await?;
}
self.steps = ScanIndexFilesSteps::Done;
let item = format!("level 1 done");
Ok(Some((item, self)))
}
ScanIndexFilesSteps::Done => Ok(None),
}
}
}
impl UnfoldExec for ScanIndexFiles {
type Output = String;
fn exec(self) -> Pin<Box<dyn Future<Output = Result<Option<(Self::Output, Self)>, Error>> + Send>>
where
Self: Sized,
{
Box::pin(self.exec())
}
}
pub fn scan_index_files(conf: ChannelArchiver) -> impl Stream<Item = Result<String, Error>> {
unfold_stream(ScanIndexFiles::new(conf.clone()))
/*
enum UnfoldState {
Running(ScanIndexFiles),
Done,
}
unfold(UnfoldState::Running(ScanIndexFiles::new(conf)), |st| async move {
match st {
UnfoldState::Running(st) => match st.exec().await {
Ok(Some((item, st))) => Some((Ok(item), UnfoldState::Running(st))),
Ok(None) => None,
Err(e) => {
error!("{}", e);
Some((Err(e), UnfoldState::Done))
}
},
UnfoldState::Done => None,
}
})
*/
}
pub fn unfold1() -> impl Stream<Item = String> {
unfold(123u32, |st| async move { Some((format!("{}", st), st)) })
}
pub fn unfold2(_conf: ChannelArchiver) -> () {
/*let f1 = async move {
let _list = get_level_0(conf).await?;
let yld = format!("level 0 done");
let fut = async { Ok(None) };
Ok(Some((yld, Box::pin(fut))))
};
unfold(
Box::pin(f1) as Pin<Box<dyn Future<Output = Result<Option<(String, _)>, Error>>>>,
|st| async {
match st.await {
Ok(None) => None,
Ok(Some((item, st))) => {
//Some((item, st));
//Some((String::new(), Box::pin(async { Ok(None) })))
None
}
Err(e) => {
error!("{}", e);
None
}
}
},
)*/
err::todoval()
}
// -------------------------------------------------
enum ScanChannelsSteps {
Start,
SelectIndexFile,
Done,
}
struct ScanChannels {
conf: ChannelArchiver,
steps: ScanChannelsSteps,
}
impl ScanChannels {
fn new(conf: ChannelArchiver) -> Self {
Self {
conf,
steps: ScanChannelsSteps::Start,
}
}
async fn exec(mut self) -> Result<Option<(String, Self)>, Error> {
use ScanChannelsSteps::*;
match self.steps {
Start => {
self.steps = SelectIndexFile;
Ok(Some((format!("Start"), self)))
}
SelectIndexFile => {
let dbc = database_connect(&self.conf.database).await?;
let sql =
"select path from indexfiles where ts_last_channel_search < now() - interval '1 hour' limit 1";
let rows = dbc.query(sql, &[]).await?;
let mut paths = vec![];
for row in rows {
paths.push(row.get::<_, String>(0));
}
self.steps = Done;
Ok(Some((format!("SelectIndexFile {:?}", paths), self)))
}
Done => Ok(None),
}
}
}
impl UnfoldExec for ScanChannels {
type Output = String;
fn exec(self) -> Pin<Box<dyn Future<Output = Result<Option<(Self::Output, Self)>, Error>> + Send>>
where
Self: Sized,
{
Box::pin(self.exec())
}
}
pub fn scan_channels(conf: ChannelArchiver) -> impl Stream<Item = Result<String, Error>> {
unfold_stream(ScanChannels::new(conf.clone()))
}

View File

@@ -4,6 +4,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::Framable;
use netpod::ChannelConfigQuery;
use netpod::{query::RawEventsQuery, ChannelArchiver};
use std::pin::Pin;
use streams::rangefilter::RangeFilter;
@@ -12,27 +13,35 @@ pub async fn make_event_pipe(
evq: &RawEventsQuery,
conf: &ChannelArchiver,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// In order to extract something from the channel, need to look up first the type of the channel.
//let ci = channel_info(&evq.channel, aa).await?;
/*let mut inps = vec![];
for p1 in &aa.data_base_paths {
let p2 = p1.clone();
let p3 = make_single_event_pipe(evq, p2).await?;
inps.push(p3);
}
let sm = StorageMerge {
inprng: inps.len() - 1,
current_inp_item: (0..inps.len()).into_iter().map(|_| None).collect(),
completed_inps: vec![false; inps.len()],
inps,
};*/
let range = evq.range.clone();
let channel = evq.channel.clone();
let expand = evq.agg_kind.need_expand();
let data = DatablockStream::for_channel_range(range.clone(), channel, conf.data_base_paths.clone().into(), expand);
// TODO I need the numeric type here which I expect for that channel in order to construct FrameMaker.
// TODO Need to pass that requirement down to disk reader: error if type changes.
let channel_config = {
let q = ChannelConfigQuery {
channel: channel.clone(),
range: range.clone(),
};
crate::archeng::channel_config(&q, conf).await?
};
let data = DatablockStream::for_channel_range(
range.clone(),
channel,
conf.data_base_paths.clone().into(),
expand,
u64::MAX,
);
let filtered = RangeFilter::new(data, range, expand);
let stream = filtered;
let mut frame_maker = Box::new(FrameMaker::untyped(evq.agg_kind.clone())) as Box<dyn FrameMakerTrait>;
let mut frame_maker = Box::new(FrameMaker::with_item_type(
channel_config.scalar_type.clone(),
channel_config.shape.clone(),
evq.agg_kind.clone(),
)) as Box<dyn FrameMakerTrait>;
let ret = stream.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
}