From a4e2326650ba61ec1cb96d4cd6755bb4482839bf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 9 Sep 2021 16:28:56 +0200 Subject: [PATCH] Fix when unexpected split shows up in local storage --- disk/src/dataopen.rs | 391 ++++++++++++++------------------------- disk/src/eventblobs.rs | 81 +++++--- disk/src/eventchunker.rs | 48 ++++- disk/src/lib.rs | 16 +- disk/src/mergeblobs.rs | 249 +++++++++++++++++++++++++ disk/src/paths.rs | 54 +++++- httpret/src/api1.rs | 23 +++ httpret/src/lib.rs | 6 + 8 files changed, 589 insertions(+), 279 deletions(-) create mode 100644 disk/src/mergeblobs.rs diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 5256a81..9b21296 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -18,6 +18,12 @@ pub struct OpenedFile { pub nreads: u32, } +#[derive(Debug)] +pub struct OpenedFileSet { + pub timebin: u64, + pub files: Vec, +} + impl fmt::Debug for OpenedFile { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("OpenedFile") @@ -34,7 +40,7 @@ pub fn open_files( range: &NanoRange, channel_config: &ChannelConfig, node: Node, -) -> async_channel::Receiver> { +) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); let channel_config = channel_config.clone(); @@ -53,30 +59,16 @@ pub fn open_files( } async fn open_files_inner( - chtx: &async_channel::Sender>, + chtx: &async_channel::Sender>, range: &NanoRange, channel_config: &ChannelConfig, node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); - let mut timebins = vec![]; - { - let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; - let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); - while let Some(e) = rd.next().await { - let e = e?; - let dn = e - .file_name() - .into_string() - .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; - let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); - if vv == 19 { - timebins.push(dn.parse::()?); - } - } + let timebins = get_timebins(&channel_config, node.clone()).await?; + if timebins.len() == 0 { + return Ok(()); } - timebins.sort_unstable(); - let timebins = timebins; for &tb in &timebins { let ts_bin = Nanos { ns: tb * channel_config.time_bin_size.ns, @@ -87,10 +79,37 @@ async fn open_files_inner( if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { continue; } - let path = paths::datapath(tb, &channel_config, &node); - let mut file = OpenOptions::new().read(true).open(&path).await?; - let ret = { - let index_path = paths::index_path(ts_bin, &channel_config, &node)?; + let mut a = vec![]; + for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + let w = position_file(&path, range, &channel_config, false).await?; + if w.found { + a.push(w.file); + } + } + let h = OpenedFileSet { timebin: tb, files: a }; + info!( + "----- open_files_inner giving OpenedFileSet with {} files", + h.files.len() + ); + chtx.send(Ok(h)).await?; + } + Ok(()) +} + +struct Positioned { + file: OpenedFile, + found: bool, +} + +async fn position_file( + path: &PathBuf, + range: &NanoRange, + channel_config: &ChannelConfig, + expand: bool, +) -> Result { + match OpenOptions::new().read(true).open(&path).await { + Ok(file) => { + let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap())); match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { let meta = index_file.metadata().await?; @@ -118,63 +137,91 @@ async fn open_files_inner( let mut buf = BytesMut::with_capacity(meta.len() as usize); buf.resize(buf.capacity(), 0); index_file.read_exact(&mut buf).await?; - match super::index::find_ge(range.beg, &buf[2..])? { + let gg = if expand { + super::index::find_largest_smaller_than(range.beg, &buf[2..])? + } else { + super::index::find_ge(range.beg, &buf[2..])? + }; + match gg { Some(o) => { + let mut file = file; file.seek(SeekFrom::Start(o.1)).await?; - OpenedFile { + info!("position_file case A {:?}", path); + let g = OpenedFile { file: Some(file), - path, + path: path.clone(), positioned: true, index: true, nreads: 0, - } + }; + return Ok(Positioned { file: g, found: true }); + } + None => { + info!("position_file case B {:?}", path); + let g = OpenedFile { + file: None, + path: path.clone(), + positioned: false, + index: true, + nreads: 0, + }; + return Ok(Positioned { file: g, found: false }); } - None => OpenedFile { - file: None, - path, - positioned: false, - index: true, - nreads: 0, - }, } } Err(e) => match e.kind() { ErrorKind::NotFound => { let ts1 = Instant::now(); - let res = super::index::position_static_len_datafile(file, range.beg).await?; + let res = if expand { + super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await? + } else { + super::index::position_static_len_datafile(file, range.beg).await? + }; let ts2 = Instant::now(); if false { // TODO collect for stats: let dur = ts2.duration_since(ts1); info!("position_static_len_datafile took ms {}", dur.as_millis()); } - file = res.0; + let file = res.0; if res.1 { - OpenedFile { + info!("position_file case C {:?}", path); + let g = OpenedFile { file: Some(file), - path, + path: path.clone(), positioned: true, index: false, nreads: res.2, - } + }; + return Ok(Positioned { file: g, found: true }); } else { - OpenedFile { + info!("position_file case D {:?}", path); + let g = OpenedFile { file: None, - path, + path: path.clone(), positioned: false, index: false, nreads: res.2, - } + }; + return Ok(Positioned { file: g, found: false }); } } _ => Err(e)?, }, } - }; - chtx.send(Ok(ret)).await?; + } + Err(e) => { + warn!("can not open {:?} error {:?}", path, e); + let g = OpenedFile { + file: None, + path: path.clone(), + positioned: false, + index: true, + nreads: 0, + }; + return Ok(Positioned { file: g, found: false }); + } } - // TODO keep track of number of running - Ok(()) } /* @@ -185,7 +232,7 @@ pub fn open_expanded_files( range: &NanoRange, channel_config: &ChannelConfig, node: Node, -) -> async_channel::Receiver> { +) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); let channel_config = channel_config.clone(); @@ -203,31 +250,33 @@ pub fn open_expanded_files( chrx } +async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result, Error> { + let mut timebins = vec![]; + let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 19 { + timebins.push(dn.parse::()?); + } + } + timebins.sort_unstable(); + Ok(timebins) +} + async fn open_expanded_files_inner( - chtx: &async_channel::Sender>, + chtx: &async_channel::Sender>, range: &NanoRange, channel_config: &ChannelConfig, node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); - let mut timebins = vec![]; - { - let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; - let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); - while let Some(e) = rd.next().await { - let e = e?; - let dn = e - .file_name() - .into_string() - .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; - let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); - if vv == 19 { - timebins.push(dn.parse::()?); - } - } - } - timebins.sort_unstable(); - let timebins = timebins; + let timebins = get_timebins(&channel_config, node.clone()).await?; if timebins.len() == 0 { return Ok(()); } @@ -245,206 +294,50 @@ async fn open_expanded_files_inner( let mut found_first = false; loop { let tb = timebins[p1]; - let ts_bin = Nanos { - ns: tb * channel_config.time_bin_size.ns, - }; - let path = paths::datapath(tb, &channel_config, &node); - let mut file = OpenOptions::new().read(true).open(&path).await?; - let ret = { - let index_path = paths::index_path(ts_bin, &channel_config, &node)?; - match OpenOptions::new().read(true).open(&index_path).await { - Ok(mut index_file) => { - let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 20 { - return Err(Error::with_msg(format!( - "too large index file {} bytes for {}", - meta.len(), - channel_config.channel.name - ))); - } - if meta.len() < 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {}", - meta.len(), - channel_config.channel.name - ))); - } - if meta.len() % 16 != 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {}", - meta.len(), - channel_config.channel.name - ))); - } - let mut buf = BytesMut::with_capacity(meta.len() as usize); - buf.resize(buf.capacity(), 0); - index_file.read_exact(&mut buf).await?; - match super::index::find_largest_smaller_than(range.beg, &buf[2..])? { - Some(o) => { - found_first = true; - file.seek(SeekFrom::Start(o.1)).await?; - OpenedFile { - file: Some(file), - path, - positioned: true, - index: true, - nreads: 0, - } - } - None => OpenedFile { - file: None, - path, - positioned: false, - index: true, - nreads: 0, - }, - } - } - Err(e) => match e.kind() { - ErrorKind::NotFound => { - let ts1 = Instant::now(); - let res = - super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?; - let ts2 = Instant::now(); - if false { - // TODO collect for stats: - let dur = ts2.duration_since(ts1); - info!("position_static_len_datafile took ms {}", dur.as_millis()); - } - file = res.0; - if res.1 { - found_first = true; - OpenedFile { - file: Some(file), - path, - positioned: true, - index: false, - nreads: res.2, - } - } else { - OpenedFile { - file: None, - path, - positioned: false, - index: false, - nreads: res.2, - } - } - } - _ => Err(e)?, - }, + let mut a = vec![]; + for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + let w = position_file(&path, range, &channel_config, true).await?; + if w.found { + info!("----- open_expanded_files_inner w.found for {:?}", path); + a.push(w.file); + found_first = true; } - }; + } + let h = OpenedFileSet { timebin: tb, files: a }; + info!( + "----- open_expanded_files_inner giving OpenedFileSet with {} files", + h.files.len() + ); + chtx.send(Ok(h)).await?; if found_first { p1 += 1; - chtx.send(Ok(ret)).await?; + break; + } else if p1 == 0 { break; } else { - if p1 == 0 { - break; - } else { - p1 -= 1; - } + p1 -= 1; } } if found_first { // Append all following positioned files. loop { let tb = timebins[p1]; - let ts_bin = Nanos { - ns: tb * channel_config.time_bin_size.ns, - }; - let path = paths::datapath(tb, &channel_config, &node); - let mut file = OpenOptions::new().read(true).open(&path).await?; - let ret = { - let index_path = paths::index_path(ts_bin, &channel_config, &node)?; - match OpenOptions::new().read(true).open(&index_path).await { - Ok(mut index_file) => { - let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 20 { - return Err(Error::with_msg(format!( - "too large index file {} bytes for {}", - meta.len(), - channel_config.channel.name - ))); - } - if meta.len() < 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {}", - meta.len(), - channel_config.channel.name - ))); - } - if meta.len() % 16 != 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {}", - meta.len(), - channel_config.channel.name - ))); - } - let mut buf = BytesMut::with_capacity(meta.len() as usize); - buf.resize(buf.capacity(), 0); - index_file.read_exact(&mut buf).await?; - match super::index::find_ge(range.beg, &buf[2..])? { - Some(o) => { - file.seek(SeekFrom::Start(o.1)).await?; - OpenedFile { - file: Some(file), - path, - positioned: true, - index: true, - nreads: 0, - } - } - None => OpenedFile { - file: None, - path, - positioned: false, - index: true, - nreads: 0, - }, - } - } - Err(e) => match e.kind() { - ErrorKind::NotFound => { - let ts1 = Instant::now(); - let res = super::index::position_static_len_datafile(file, range.beg).await?; - let ts2 = Instant::now(); - if false { - // TODO collect for stats: - let dur = ts2.duration_since(ts1); - info!("position_static_len_datafile took ms {}", dur.as_millis()); - } - file = res.0; - if res.1 { - OpenedFile { - file: Some(file), - path, - positioned: true, - index: false, - nreads: res.2, - } - } else { - OpenedFile { - file: None, - path, - positioned: false, - index: false, - nreads: res.2, - } - } - } - _ => Err(e)?, - }, + let mut a = vec![]; + for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + let w = position_file(&path, range, &channel_config, false).await?; + if w.found { + a.push(w.file); } - }; - chtx.send(Ok(ret)).await?; + } + let h = OpenedFileSet { timebin: tb, files: a }; + chtx.send(Ok(h)).await?; p1 += 1; if p1 >= timebins.len() { break; } } } else { + info!("Could not find some event before the requested range, fall back to standard file list."); // Try to locate files according to non-expand-algorithm. open_files_inner(chtx, range, &channel_config, node).await?; } @@ -481,7 +374,7 @@ fn expanded_file_list() { match file { Ok(k) => { info!("opened file: {:?}", k); - paths.push(k.path.clone()); + paths.push(k.files); } Err(e) => { error!("error while trying to open {:?}", e); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 754dd5f..c991b9c 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,10 +1,11 @@ -use crate::dataopen::{open_expanded_files, open_files, OpenedFile}; +use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; -use crate::file_content_stream; +use crate::mergeblobs::MergedBlobsStream; +use crate::{file_content_stream, HasSeenBeforeRangeCount}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::{LogItem, RangeCompletableItem, StreamItem}; +use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ChannelConfig, NanoRange, Node}; @@ -13,10 +14,14 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::task::{Context, Poll}; +pub trait InputTraits: Stream> + HasSeenBeforeRangeCount {} + +impl InputTraits for T where T: Stream> + HasSeenBeforeRangeCount {} + pub struct EventChunkerMultifile { channel_config: ChannelConfig, - file_chan: async_channel::Receiver>, - evs: Option, + file_chan: async_channel::Receiver>, + evs: Option>>, buffer_size: usize, event_chunker_conf: EventChunkerConf, range: NanoRange, @@ -108,27 +113,55 @@ impl Stream for EventChunkerMultifile { }, None => match self.file_chan.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(file) => { - self.files_count += 1; - let path = file.path; - let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path)); - match file.file { - Some(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - path, - self.max_ts.clone(), - self.expand, - ); - self.evs = Some(chunker); + Ok(ofs) => { + self.files_count += ofs.files.len() as u32; + if ofs.files.len() == 1 { + let mut ofs = ofs; + let file = ofs.files.pop().unwrap(); + let path = file.path; + let item = LogItem::quick(Level::INFO, format!("handle OFS {:?}", ofs)); + match file.file { + Some(file) => { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + path, + self.max_ts.clone(), + self.expand, + ); + self.evs = Some(Box::pin(chunker)); + } + None => {} } - None => {} + Ready(Some(Ok(StreamItem::Log(item)))) + } else if ofs.files.len() > 1 { + let item = LogItem::quick(Level::INFO, format!("handle OFS MULTIPLE {:?}", ofs)); + let mut chunkers = vec![]; + for of in ofs.files { + if let Some(file) = of.file { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + of.path, + self.max_ts.clone(), + self.expand, + ); + chunkers.push(chunker); + } + } + let merged = MergedBlobsStream::new(chunkers); + self.evs = Some(Box::pin(merged)); + Ready(Some(Ok(StreamItem::Log(item)))) + } else { + let item = LogItem::quick(Level::INFO, format!("handle OFS {:?} NO FILES", ofs)); + Ready(Some(Ok(StreamItem::Log(item)))) } - Ready(Some(Ok(StreamItem::Log(item)))) } Err(e) => { self.errored = true; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index db93a8d..f10e6ae 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,10 +1,10 @@ -use crate::{FileChunkRead, NeedMinBuffer}; +use crate::{FileChunkRead, HasSeenBeforeRangeCount, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::{RangeCompletableItem, StatsItem, StreamItem}; +use items::{Appendable, PushableIndex, RangeCompletableItem, StatsItem, StreamItem, WithLen, WithTimestamps}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; @@ -360,6 +360,44 @@ impl EventFull { } } +impl WithLen for EventFull { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl Appendable for EventFull { + fn empty() -> Self { + Self::empty() + } + + // TODO expensive, get rid of it. + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.pulses.extend_from_slice(&src.pulses); + self.decomps.extend_from_slice(&src.decomps); + self.scalar_types.extend_from_slice(&src.scalar_types); + self.be.extend_from_slice(&src.be); + } +} + +impl WithTimestamps for EventFull { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl PushableIndex for EventFull { + // TODO check all use cases, can't we move? + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.pulses.push(src.pulses[ix]); + self.decomps.push(src.decomps[ix].clone()); + self.scalar_types.push(src.scalar_types[ix].clone()); + self.be.push(src.be[ix]); + } +} + impl Stream for EventChunker { type Item = Result>, Error>; @@ -442,3 +480,9 @@ impl Stream for EventChunker { } } } + +impl HasSeenBeforeRangeCount for EventChunker { + fn seen_before_range_count(&self) -> usize { + self.seen_before_range_count() + } +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 0dae017..531e0fc 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -31,6 +31,7 @@ pub mod frame; pub mod gen; pub mod index; pub mod merge; +pub mod mergeblobs; pub mod paths; pub mod raw; pub mod streamlog; @@ -283,7 +284,10 @@ fn unused_raw_concat_channel_read_stream_file_pipe( let chrx = open_files(&range, &channel_config, node); while let Ok(file) = chrx.recv().await { let mut file = match file { - Ok(k) => k.file.unwrap(), + Ok(mut k) => { + k.files.truncate(1); + k.files.pop().unwrap().file.unwrap() + } Err(_) => break }; loop { @@ -419,7 +423,8 @@ impl Stream for NeedMinBuffer { } } -pub fn raw_concat_channel_read_stream( +#[allow(dead_code)] +fn raw_concat_channel_read_stream( query: &netpod::AggQuerySingleChannel, node: Node, ) -> impl Stream> + Send { @@ -443,7 +448,8 @@ pub fn raw_concat_channel_read_stream( } } -pub fn raw_concat_channel_read_stream_timebin( +#[allow(dead_code)] +fn raw_concat_channel_read_stream_timebin( query: &netpod::AggQuerySingleChannel, node: Node, ) -> impl Stream> { @@ -507,3 +513,7 @@ impl ChannelConfigExt for ChannelConfig { ret } } + +pub trait HasSeenBeforeRangeCount { + fn seen_before_range_count(&self) -> usize; +} diff --git a/disk/src/mergeblobs.rs b/disk/src/mergeblobs.rs new file mode 100644 index 0000000..18dec52 --- /dev/null +++ b/disk/src/mergeblobs.rs @@ -0,0 +1,249 @@ +use crate::HasSeenBeforeRangeCount; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use items::{ + Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen, WithTimestamps, +}; +use netpod::log::*; +use netpod::EventDataReadStats; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +enum MergedCurVal { + None, + Finish, + Val(T), +} + +pub struct MergedBlobsStream +where + S: Stream> + Unpin, + I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen, +{ + inps: Vec, + current: Vec>, + ixs: Vec, + errored: bool, + completed: bool, + batch: I, + ts_last_emit: u64, + range_complete_observed: Vec, + range_complete_observed_all: bool, + range_complete_observed_all_emitted: bool, + data_emit_complete: bool, + batch_size: usize, + logitems: VecDeque, + event_data_read_stats_items: VecDeque, +} + +impl MergedBlobsStream +where + S: Stream> + Unpin, + I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen, +{ + pub fn new(inps: Vec) -> Self { + let n = inps.len(); + let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); + Self { + inps, + current: current, + ixs: vec![0; n], + errored: false, + completed: false, + batch: I::empty(), + ts_last_emit: 0, + range_complete_observed: vec![false; n], + range_complete_observed_all: false, + range_complete_observed_all_emitted: false, + data_emit_complete: false, + batch_size: 64, + logitems: VecDeque::new(), + event_data_read_stats_items: VecDeque::new(), + } + } + + fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut pending = 0; + for i1 in 0..self.inps.len() { + match self.current[i1] { + MergedCurVal::None => { + 'l1: loop { + break match self.inps[i1].poll_next_unpin(cx) { + Ready(Some(Ok(k))) => match k { + StreamItem::Log(item) => { + self.logitems.push_back(item); + continue 'l1; + } + StreamItem::Stats(item) => { + match item { + StatsItem::EventDataReadStats(item) => { + self.event_data_read_stats_items.push_back(item); + } + } + continue 'l1; + } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + debug!("MergedStream range_complete d {} COMPLETE", d); + } else { + trace!("MergedStream range_complete d {}", d); + } + continue 'l1; + } + RangeCompletableItem::Data(item) => { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::Val(item); + } + }, + }, + Ready(Some(Err(e))) => { + // TODO emit this error, consider this stream as done, anything more to do here? + //self.current[i1] = CurVal::Err(e); + self.errored = true; + return Ready(Err(e)); + } + Ready(None) => { + self.current[i1] = MergedCurVal::Finish; + } + Pending => { + pending += 1; + } + }; + } + } + _ => (), + } + } + if pending > 0 { + Pending + } else { + Ready(Ok(())) + } + } +} + +impl Stream for MergedBlobsStream +where + S: Stream> + Unpin, + I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.logitems.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else if let Some(item) = self.event_data_read_stats_items.pop_front() { + Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) + } else if self.data_emit_complete { + if self.range_complete_observed_all { + if self.range_complete_observed_all_emitted { + self.completed = true; + Ready(None) + } else { + self.range_complete_observed_all_emitted = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + } else { + self.completed = true; + Ready(None) + } + } else { + // Can only run logic if all streams are either finished, errored or have some current value. + match self.replenish(cx) { + Ready(Ok(_)) => { + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + if let MergedCurVal::Val(val) = &self.current[i1] { + let u = self.ixs[i1]; + if u >= val.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::None; + continue 'outer; + } else { + let ts = val.ts(u); + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + } + if lowest_ix == usize::MAX { + if self.batch.len() != 0 { + let emp = I::empty(); + let ret = std::mem::replace(&mut self.batch, emp); + self.data_emit_complete = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + self.data_emit_complete = true; + continue 'outer; + } + } else { + assert!(lowest_ts >= self.ts_last_emit); + let emp = I::empty(); + let mut local_batch = std::mem::replace(&mut self.batch, emp); + self.ts_last_emit = lowest_ts; + let rix = self.ixs[lowest_ix]; + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => { + local_batch.push_index(val, rix); + } + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + } + self.batch = local_batch; + self.ixs[lowest_ix] += 1; + let curlen = match &self.current[lowest_ix] { + MergedCurVal::Val(val) => val.len(), + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + }; + if self.ixs[lowest_ix] >= curlen { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedCurVal::None; + } + if self.batch.len() >= self.batch_size { + let emp = I::empty(); + let ret = std::mem::replace(&mut self.batch, emp); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + continue 'outer; + } + } + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + }; + } + } +} + +impl HasSeenBeforeRangeCount for MergedBlobsStream +where + S: Stream> + Unpin, + I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen, +{ + fn seen_before_range_count(&self) -> usize { + // TODO (only for debug) + 0 + } +} diff --git a/disk/src/paths.rs b/disk/src/paths.rs index e39ab1b..8f59897 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -1,10 +1,12 @@ use err::Error; +use futures_util::StreamExt; +use netpod::log::*; use netpod::timeunits::MS; use netpod::{ChannelConfig, Nanos, Node}; use std::path::PathBuf; +// TODO remove this pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { - //let pre = "/data/sf-databuffer/daq_swissfel"; node.data_base_path .join(format!("{}_{}", node.ksprefix, config.keyspace)) .join("byTime") @@ -14,6 +16,56 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> Pa .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)) } +/** +Return potential datafile paths for the given timebin. + +It says "potential datafile paths" because we don't open the file here yet and of course, +files may vanish until then. Also, the timebin may actually not exist. +*/ +pub async fn datapaths_for_timebin( + timebin: u64, + config: &netpod::ChannelConfig, + node: &Node, +) -> Result, Error> { + let timebin_path = node + .data_base_path + .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join("byTime") + .join(config.channel.name.clone()) + .join(format!("{:019}", timebin)); + let rd = tokio::fs::read_dir(timebin_path).await?; + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + let mut splits = vec![]; + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|s| Error::with_msg(format!("Bad OS path {:?}", s)))?; + if dn.len() != 10 { + return Err(Error::with_msg(format!("bad split dirname {:?}", e))); + } + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 10 { + splits.push(dn.parse::()?); + } + } + let mut ret = vec![]; + for split in splits { + let path = node + .data_base_path + .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join("byTime") + .join(config.channel.name.clone()) + .join(format!("{:019}", timebin)) + .join(format!("{:010}", split)) + .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)); + ret.push(path); + } + info!("\n\ndatapaths_for_timebin returns: {:?}\n", ret); + Ok(ret) +} + pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result { let ret = node .data_base_path diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index afda07d..ae771a6 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -4,6 +4,7 @@ use err::Error; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use itertools::Itertools; +use netpod::{log::*, NodeConfigCached, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -461,3 +462,25 @@ pub async fn proxy_distribute_v1(req: Request) -> Result, E }); Ok(res) } + +pub async fn api1_binary_events(req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + info!("api1_binary_events headers: {:?}", req.headers()); + let accept_def = ""; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_OCTET { + // Ok(plain_events_binary(req, node_config).await.map_err(|e| { + // error!("{:?}", e); + // e + // })?) + let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); + error!("{:?}", e); + Err(e) + } else { + let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); + error!("{:?}", e); + Err(e) + } +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 99a88e8..67cb7a9 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -240,6 +240,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/1/query" { + if req.method() == Method::POST { + Ok(api1::api1_binary_events(req, &node_config).await?) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + } } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path)