From d6357954c13f94519d6690a62193dd66e4cda60f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 14 May 2021 16:45:08 +0200 Subject: [PATCH] WIP --- disk/src/agg.rs | 4 ++++ disk/src/binned.rs | 32 ++++++++++++++----------- disk/src/cache.rs | 4 +--- disk/src/dataopen.rs | 51 +++++++++++++++++++++++++++------------- disk/src/eventblobs.rs | 34 ++++++++++++++++++--------- disk/src/eventchunker.rs | 2 ++ disk/src/index.rs | 10 ++++---- disk/src/lib.rs | 4 ++-- disk/src/raw/conn.rs | 4 +++- disk/src/streamlog.rs | 10 ++++++++ 10 files changed, 104 insertions(+), 51 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index ebacf45..7387bf6 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,6 +6,7 @@ use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; use crate::eventchunker::EventChunkerItem; +use crate::streamlog::LogItem; use bytes::BytesMut; use err::Error; use futures_core::Stream; @@ -508,6 +509,7 @@ pub enum Dim1F32StreamItem { Values(ValuesDim1), RangeComplete, EventDataReadStats(EventDataReadStats), + Log(LogItem), } impl Stream for Dim1F32Stream @@ -540,6 +542,7 @@ where } }, EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))), + EventChunkerItem::Log(item) => Ready(Some(Ok(Dim1F32StreamItem::Log(item)))), EventChunkerItem::EventDataReadStats(stats) => { let ret = Dim1F32StreamItem::EventDataReadStats(stats); Ready(Some(Ok(ret))) @@ -587,6 +590,7 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem { Dim1F32StreamItem::EventDataReadStats(stats) => { MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) } + Dim1F32StreamItem::Log(item) => MinMaxAvgScalarEventBatchStreamItem::Log(item), Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete, } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 63ae254..d937d2f 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -160,27 +160,31 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> }; match item { Some(item) => { + use MinMaxAvgScalarBinBatchStreamItem::*; match item { - Ok(item) => { - match item { - MinMaxAvgScalarBinBatchStreamItem::Values(mut vals) => { - batch.ts1s.append(&mut vals.ts1s); - batch.ts2s.append(&mut vals.ts2s); - batch.counts.append(&mut vals.counts); - batch.mins.append(&mut vals.mins); - batch.maxs.append(&mut vals.maxs); - batch.avgs.append(&mut vals.avgs); - } - _ => {} + Ok(item) => match item { + Values(mut vals) => { + info!("APPEND BATCH {}", vals.ts1s.len()); + batch.ts1s.append(&mut vals.ts1s); + batch.ts2s.append(&mut vals.ts2s); + batch.counts.append(&mut vals.counts); + batch.mins.append(&mut vals.mins); + batch.maxs.append(&mut vals.maxs); + batch.avgs.append(&mut vals.avgs); + i1 += 1; } - serde_json::Value::String(format!("all good")) + Log(_) => {} + EventDataReadStats(_) => {} + RangeComplete => {} + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + Err(e)? } - Err(e) => serde_json::Value::String(format!("{:?}", e)), }; } None => break, } - i1 += 1; } let mut ret = BinnedJsonResult { ts_bin_edges: batch.ts1s, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 117f8ce..2abe2ed 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -61,9 +61,7 @@ impl BinnedQuery { let params = netpod::query_params(req.uri.query()); let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; - let disk_stats_every = params - .get("disk_stats_every_kb") - .ok_or(Error::with_msg("missing disk_stats_every_kb"))?; + let disk_stats_every = params.get("disk_stats_every_kb").map_or("2000", |k| k); let disk_stats_every = disk_stats_every .parse() .map_err(|e| Error::with_msg(format!("can not parse disk_stats_every_kb {:?}", e)))?; diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 33b4d56..8eec820 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -11,7 +11,10 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; pub struct OpenedFile { pub path: PathBuf, - pub file: File, + pub file: Option, + pub positioned: bool, + pub index: bool, + pub nreads: u32, } pub fn open_files( @@ -78,8 +81,7 @@ async fn open_files_inner( debug!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; debug!("opened file {:?} {:?}", &path, &file); - let mut use_file = false; - { + let ret = { let index_path = paths::index_path(ts_bin, &channel_config, &node)?; match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { @@ -111,14 +113,22 @@ async fn open_files_inner( index_file.read_exact(&mut buf).await?; match super::index::find_ge(range.beg, &buf[2..])? { Some(o) => { - debug!("FOUND ts IN INDEX: {:?}", o); file.seek(SeekFrom::Start(o.1)).await?; - use_file = true; - } - None => { - debug!("NOT FOUND IN INDEX"); - use_file = false; + OpenedFile { + file: Some(file), + path, + positioned: true, + index: true, + nreads: 0, + } } + None => OpenedFile { + file: None, + path, + positioned: false, + index: true, + nreads: 0, + }, } } Err(e) => match e.kind() { @@ -126,19 +136,28 @@ async fn open_files_inner( let res = super::index::position_static_len_datafile(file, range.beg).await?; file = res.0; if res.1 { - use_file = true; + OpenedFile { + file: Some(file), + path, + positioned: true, + index: false, + nreads: res.2, + } } else { - use_file = false; + OpenedFile { + file: None, + path, + positioned: false, + index: false, + nreads: 0, + } } } _ => Err(e)?, }, } - } - if use_file { - let ret = OpenedFile { file, path }; - chtx.send(Ok(ret)).await?; - } + }; + chtx.send(Ok(ret)).await?; } // TODO keep track of number of running debug!("open_files_inner done"); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index aab7874..4476f00 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,9 +1,12 @@ use crate::dataopen::{open_files, OpenedFile}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem}; use crate::file_content_stream; +use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::log::*; +use netpod::timeunits::SEC; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; use std::sync::atomic::AtomicU64; @@ -30,6 +33,7 @@ impl EventBlobsComplete { buffer_size: usize, event_chunker_conf: EventChunkerConf, ) -> Self { + info!("EventBlobsComplete::new beg {}", range.beg / SEC); Self { file_chan: open_files(&range, &channel_config, node), evs: None, @@ -69,17 +73,25 @@ impl Stream for EventBlobsComplete { Ready(Some(k)) => match k { Ok(file) => { let path = file.path; - let inp = Box::pin(file_content_stream(file.file, self.buffer_size as usize)); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - path, - self.max_ts.clone(), - ); - self.evs = Some(chunker); - continue 'outer; + //info!("handling {:?}", path); + let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path)); + match file.file { + Some(file) => { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + path, + self.max_ts.clone(), + ); + self.evs = Some(chunker); + } + None => {} + } + Ready(Some(Ok(EventChunkerItem::Log(item)))) + //continue 'outer; } Err(e) => { self.errored = true; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 3e34ed7..3d4a63b 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,3 +1,4 @@ +use crate::streamlog::LogItem; use crate::{FileChunkRead, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; @@ -348,6 +349,7 @@ pub enum EventChunkerItem { Events(EventFull), RangeComplete, EventDataReadStats(EventDataReadStats), + Log(LogItem), } impl Stream for EventChunker { diff --git a/disk/src/index.rs b/disk/src/index.rs index 65e9ca4..eb8d954 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -140,7 +140,7 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er Ok(ev) } -pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool), Error> { +pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool, u32), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; let mut buf = vec![0; 1024]; @@ -160,18 +160,19 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F )))?; } let y = t.1.ns; + let mut nreads = 2; if x >= beg { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, true)); + return Ok((file, true, nreads)); } if y < beg { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, false)); + return Ok((file, false, nreads)); } loop { if k - j < 2 * evlen { file.seek(SeekFrom::Start(k)).await?; - return Ok((file, true)); + return Ok((file, true, nreads)); } let m = j + (k - j) / 2 / evlen * evlen; let t = read_event_at(m, &mut file).await?; @@ -181,6 +182,7 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F t.0, evlen )))?; } + nreads += 1; let x = t.1.ns; if x < beg { j = m; diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 44631ef..1041908 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -284,7 +284,7 @@ fn unused_raw_concat_channel_read_stream_file_pipe( let chrx = open_files(&range, &channel_config, node); while let Ok(file) = chrx.recv().await { let mut file = match file { - Ok(k) => k.file, + Ok(k) => k.file.unwrap(), Err(_) => break }; loop { @@ -346,7 +346,7 @@ pub fn parsed1( while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { - let inp = Box::pin(file_content_stream(file.file, query.buffer_size as usize)); + let inp = Box::pin(file_content_stream(file.file.unwrap(), query.buffer_size as usize)); let range = err::todoval(); let max_ts = err::todoval(); let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path, max_ts); diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 138f65f..d212139 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -10,6 +10,7 @@ use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::{ByteSize, NodeConfigCached, PerfOpts, Shape}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; @@ -132,7 +133,8 @@ async fn raw_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - debug!("found config entry {:?}", entry); + //info!("found config entry {:?}", entry); + info!("raw_conn_handler_inner_try beg {}", range.beg / SEC); let shape = match &entry.shape { Some(lens) => { if lens.len() == 1 { diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs index 5c856fa..83e36b1 100644 --- a/disk/src/streamlog.rs +++ b/disk/src/streamlog.rs @@ -12,6 +12,16 @@ pub struct LogItem { msg: String, } +impl LogItem { + pub fn quick(level: Level, msg: String) -> Self { + Self { + level, + msg, + node_ix: 42, + } + } +} + struct VisitLevel; impl<'de> Visitor<'de> for VisitLevel {