Fix when unexpected split shows up in local storage

This commit is contained in:
Dominik Werder
2021-09-09 16:28:56 +02:00
parent c455c01d24
commit a4e2326650
8 changed files with 589 additions and 279 deletions

View File

@@ -18,6 +18,12 @@ pub struct OpenedFile {
pub nreads: u32,
}
#[derive(Debug)]
pub struct OpenedFileSet {
pub timebin: u64,
pub files: Vec<OpenedFile>,
}
impl fmt::Debug for OpenedFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OpenedFile")
@@ -34,7 +40,7 @@ pub fn open_files(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
) -> async_channel::Receiver<Result<OpenedFile, Error>> {
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
@@ -53,30 +59,16 @@ pub fn open_files(
}
async fn open_files_inner(
chtx: &async_channel::Sender<Result<OpenedFile, Error>>,
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
let mut timebins = vec![];
{
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
let timebins = get_timebins(&channel_config, node.clone()).await?;
if timebins.len() == 0 {
return Ok(());
}
timebins.sort_unstable();
let timebins = timebins;
for &tb in &timebins {
let ts_bin = Nanos {
ns: tb * channel_config.time_bin_size.ns,
@@ -87,10 +79,37 @@ async fn open_files_inner(
if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg {
continue;
}
let path = paths::datapath(tb, &channel_config, &node);
let mut file = OpenOptions::new().read(true).open(&path).await?;
let ret = {
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, false).await?;
if w.found {
a.push(w.file);
}
}
let h = OpenedFileSet { timebin: tb, files: a };
info!(
"----- open_files_inner giving OpenedFileSet with {} files",
h.files.len()
);
chtx.send(Ok(h)).await?;
}
Ok(())
}
struct Positioned {
file: OpenedFile,
found: bool,
}
async fn position_file(
path: &PathBuf,
range: &NanoRange,
channel_config: &ChannelConfig,
expand: bool,
) -> Result<Positioned, Error> {
match OpenOptions::new().read(true).open(&path).await {
Ok(file) => {
let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap()));
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
@@ -118,63 +137,91 @@ async fn open_files_inner(
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
index_file.read_exact(&mut buf).await?;
match super::index::find_ge(range.beg, &buf[2..])? {
let gg = if expand {
super::index::find_largest_smaller_than(range.beg, &buf[2..])?
} else {
super::index::find_ge(range.beg, &buf[2..])?
};
match gg {
Some(o) => {
let mut file = file;
file.seek(SeekFrom::Start(o.1)).await?;
OpenedFile {
info!("position_file case A {:?}", path);
let g = OpenedFile {
file: Some(file),
path,
path: path.clone(),
positioned: true,
index: true,
nreads: 0,
}
};
return Ok(Positioned { file: g, found: true });
}
None => {
info!("position_file case B {:?}", path);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
};
return Ok(Positioned { file: g, found: false });
}
None => OpenedFile {
file: None,
path,
positioned: false,
index: true,
nreads: 0,
},
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res = super::index::position_static_len_datafile(file, range.beg).await?;
let res = if expand {
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?
} else {
super::index::position_static_len_datafile(file, range.beg).await?
};
let ts2 = Instant::now();
if false {
// TODO collect for stats:
let dur = ts2.duration_since(ts1);
info!("position_static_len_datafile took ms {}", dur.as_millis());
}
file = res.0;
let file = res.0;
if res.1 {
OpenedFile {
info!("position_file case C {:?}", path);
let g = OpenedFile {
file: Some(file),
path,
path: path.clone(),
positioned: true,
index: false,
nreads: res.2,
}
};
return Ok(Positioned { file: g, found: true });
} else {
OpenedFile {
info!("position_file case D {:?}", path);
let g = OpenedFile {
file: None,
path,
path: path.clone(),
positioned: false,
index: false,
nreads: res.2,
}
};
return Ok(Positioned { file: g, found: false });
}
}
_ => Err(e)?,
},
}
};
chtx.send(Ok(ret)).await?;
}
Err(e) => {
warn!("can not open {:?} error {:?}", path, e);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
// TODO keep track of number of running
Ok(())
}
/*
@@ -185,7 +232,7 @@ pub fn open_expanded_files(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
) -> async_channel::Receiver<Result<OpenedFile, Error>> {
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
@@ -203,31 +250,33 @@ pub fn open_expanded_files(
chrx
}
async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<u64>, Error> {
let mut timebins = vec![];
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
timebins.sort_unstable();
Ok(timebins)
}
async fn open_expanded_files_inner(
chtx: &async_channel::Sender<Result<OpenedFile, Error>>,
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
let mut timebins = vec![];
{
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
}
timebins.sort_unstable();
let timebins = timebins;
let timebins = get_timebins(&channel_config, node.clone()).await?;
if timebins.len() == 0 {
return Ok(());
}
@@ -245,206 +294,50 @@ async fn open_expanded_files_inner(
let mut found_first = false;
loop {
let tb = timebins[p1];
let ts_bin = Nanos {
ns: tb * channel_config.time_bin_size.ns,
};
let path = paths::datapath(tb, &channel_config, &node);
let mut file = OpenOptions::new().read(true).open(&path).await?;
let ret = {
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 20 {
return Err(Error::with_msg(format!(
"too large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
)));
}
if meta.len() < 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
meta.len(),
channel_config.channel.name
)));
}
if meta.len() % 16 != 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
meta.len(),
channel_config.channel.name
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
index_file.read_exact(&mut buf).await?;
match super::index::find_largest_smaller_than(range.beg, &buf[2..])? {
Some(o) => {
found_first = true;
file.seek(SeekFrom::Start(o.1)).await?;
OpenedFile {
file: Some(file),
path,
positioned: true,
index: true,
nreads: 0,
}
}
None => OpenedFile {
file: None,
path,
positioned: false,
index: true,
nreads: 0,
},
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res =
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?;
let ts2 = Instant::now();
if false {
// TODO collect for stats:
let dur = ts2.duration_since(ts1);
info!("position_static_len_datafile took ms {}", dur.as_millis());
}
file = res.0;
if res.1 {
found_first = true;
OpenedFile {
file: Some(file),
path,
positioned: true,
index: false,
nreads: res.2,
}
} else {
OpenedFile {
file: None,
path,
positioned: false,
index: false,
nreads: res.2,
}
}
}
_ => Err(e)?,
},
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, true).await?;
if w.found {
info!("----- open_expanded_files_inner w.found for {:?}", path);
a.push(w.file);
found_first = true;
}
};
}
let h = OpenedFileSet { timebin: tb, files: a };
info!(
"----- open_expanded_files_inner giving OpenedFileSet with {} files",
h.files.len()
);
chtx.send(Ok(h)).await?;
if found_first {
p1 += 1;
chtx.send(Ok(ret)).await?;
break;
} else if p1 == 0 {
break;
} else {
if p1 == 0 {
break;
} else {
p1 -= 1;
}
p1 -= 1;
}
}
if found_first {
// Append all following positioned files.
loop {
let tb = timebins[p1];
let ts_bin = Nanos {
ns: tb * channel_config.time_bin_size.ns,
};
let path = paths::datapath(tb, &channel_config, &node);
let mut file = OpenOptions::new().read(true).open(&path).await?;
let ret = {
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 20 {
return Err(Error::with_msg(format!(
"too large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
)));
}
if meta.len() < 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
meta.len(),
channel_config.channel.name
)));
}
if meta.len() % 16 != 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
meta.len(),
channel_config.channel.name
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
index_file.read_exact(&mut buf).await?;
match super::index::find_ge(range.beg, &buf[2..])? {
Some(o) => {
file.seek(SeekFrom::Start(o.1)).await?;
OpenedFile {
file: Some(file),
path,
positioned: true,
index: true,
nreads: 0,
}
}
None => OpenedFile {
file: None,
path,
positioned: false,
index: true,
nreads: 0,
},
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res = super::index::position_static_len_datafile(file, range.beg).await?;
let ts2 = Instant::now();
if false {
// TODO collect for stats:
let dur = ts2.duration_since(ts1);
info!("position_static_len_datafile took ms {}", dur.as_millis());
}
file = res.0;
if res.1 {
OpenedFile {
file: Some(file),
path,
positioned: true,
index: false,
nreads: res.2,
}
} else {
OpenedFile {
file: None,
path,
positioned: false,
index: false,
nreads: res.2,
}
}
}
_ => Err(e)?,
},
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, false).await?;
if w.found {
a.push(w.file);
}
};
chtx.send(Ok(ret)).await?;
}
let h = OpenedFileSet { timebin: tb, files: a };
chtx.send(Ok(h)).await?;
p1 += 1;
if p1 >= timebins.len() {
break;
}
}
} else {
info!("Could not find some event before the requested range, fall back to standard file list.");
// Try to locate files according to non-expand-algorithm.
open_files_inner(chtx, range, &channel_config, node).await?;
}
@@ -481,7 +374,7 @@ fn expanded_file_list() {
match file {
Ok(k) => {
info!("opened file: {:?}", k);
paths.push(k.path.clone());
paths.push(k.files);
}
Err(e) => {
error!("error while trying to open {:?}", e);

View File

@@ -1,10 +1,11 @@
use crate::dataopen::{open_expanded_files, open_files, OpenedFile};
use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::file_content_stream;
use crate::mergeblobs::MergedBlobsStream;
use crate::{file_content_stream, HasSeenBeforeRangeCount};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{LogItem, RangeCompletableItem, StreamItem};
use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, NanoRange, Node};
@@ -13,10 +14,14 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::task::{Context, Poll};
pub trait InputTraits: Stream<Item = Sitemty<EventFull>> + HasSeenBeforeRangeCount {}
impl<T> InputTraits for T where T: Stream<Item = Sitemty<EventFull>> + HasSeenBeforeRangeCount {}
pub struct EventChunkerMultifile {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<OpenedFile, Error>>,
evs: Option<EventChunker>,
file_chan: async_channel::Receiver<Result<OpenedFileSet, Error>>,
evs: Option<Pin<Box<dyn InputTraits + Send>>>,
buffer_size: usize,
event_chunker_conf: EventChunkerConf,
range: NanoRange,
@@ -108,27 +113,55 @@ impl Stream for EventChunkerMultifile {
},
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(file) => {
self.files_count += 1;
let path = file.path;
let item = LogItem::quick(Level::INFO, format!("handle file {:?}", path));
match file.file {
Some(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path,
self.max_ts.clone(),
self.expand,
);
self.evs = Some(chunker);
Ok(ofs) => {
self.files_count += ofs.files.len() as u32;
if ofs.files.len() == 1 {
let mut ofs = ofs;
let file = ofs.files.pop().unwrap();
let path = file.path;
let item = LogItem::quick(Level::INFO, format!("handle OFS {:?}", ofs));
match file.file {
Some(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path,
self.max_ts.clone(),
self.expand,
);
self.evs = Some(Box::pin(chunker));
}
None => {}
}
None => {}
Ready(Some(Ok(StreamItem::Log(item))))
} else if ofs.files.len() > 1 {
let item = LogItem::quick(Level::INFO, format!("handle OFS MULTIPLE {:?}", ofs));
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
of.path,
self.max_ts.clone(),
self.expand,
);
chunkers.push(chunker);
}
}
let merged = MergedBlobsStream::new(chunkers);
self.evs = Some(Box::pin(merged));
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let item = LogItem::quick(Level::INFO, format!("handle OFS {:?} NO FILES", ofs));
Ready(Some(Ok(StreamItem::Log(item))))
}
Ready(Some(Ok(StreamItem::Log(item))))
}
Err(e) => {
self.errored = true;

View File

@@ -1,10 +1,10 @@
use crate::{FileChunkRead, NeedMinBuffer};
use crate::{FileChunkRead, HasSeenBeforeRangeCount, NeedMinBuffer};
use bitshuffle::bitshuffle_decompress;
use bytes::{Buf, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StatsItem, StreamItem};
use items::{Appendable, PushableIndex, RangeCompletableItem, StatsItem, StreamItem, WithLen, WithTimestamps};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
@@ -360,6 +360,44 @@ impl EventFull {
}
}
impl WithLen for EventFull {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Appendable for EventFull {
fn empty() -> Self {
Self::empty()
}
// TODO expensive, get rid of it.
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.pulses.extend_from_slice(&src.pulses);
self.decomps.extend_from_slice(&src.decomps);
self.scalar_types.extend_from_slice(&src.scalar_types);
self.be.extend_from_slice(&src.be);
}
}
impl WithTimestamps for EventFull {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl PushableIndex for EventFull {
// TODO check all use cases, can't we move?
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.pulses.push(src.pulses[ix]);
self.decomps.push(src.decomps[ix].clone());
self.scalar_types.push(src.scalar_types[ix].clone());
self.be.push(src.be[ix]);
}
}
impl Stream for EventChunker {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
@@ -442,3 +480,9 @@ impl Stream for EventChunker {
}
}
}
impl HasSeenBeforeRangeCount for EventChunker {
fn seen_before_range_count(&self) -> usize {
self.seen_before_range_count()
}
}

View File

@@ -31,6 +31,7 @@ pub mod frame;
pub mod gen;
pub mod index;
pub mod merge;
pub mod mergeblobs;
pub mod paths;
pub mod raw;
pub mod streamlog;
@@ -283,7 +284,10 @@ fn unused_raw_concat_channel_read_stream_file_pipe(
let chrx = open_files(&range, &channel_config, node);
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(k) => k.file.unwrap(),
Ok(mut k) => {
k.files.truncate(1);
k.files.pop().unwrap().file.unwrap()
}
Err(_) => break
};
loop {
@@ -419,7 +423,8 @@ impl Stream for NeedMinBuffer {
}
}
pub fn raw_concat_channel_read_stream(
#[allow(dead_code)]
fn raw_concat_channel_read_stream(
query: &netpod::AggQuerySingleChannel,
node: Node,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
@@ -443,7 +448,8 @@ pub fn raw_concat_channel_read_stream(
}
}
pub fn raw_concat_channel_read_stream_timebin(
#[allow(dead_code)]
fn raw_concat_channel_read_stream_timebin(
query: &netpod::AggQuerySingleChannel,
node: Node,
) -> impl Stream<Item = Result<Bytes, Error>> {
@@ -507,3 +513,7 @@ impl ChannelConfigExt for ChannelConfig {
ret
}
}
pub trait HasSeenBeforeRangeCount {
fn seen_before_range_count(&self) -> usize;
}

249
disk/src/mergeblobs.rs Normal file
View File

@@ -0,0 +1,249 @@
use crate::HasSeenBeforeRangeCount;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{
Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen, WithTimestamps,
};
use netpod::log::*;
use netpod::EventDataReadStats;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
enum MergedCurVal<T> {
None,
Finish,
Val(T),
}
pub struct MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
{
inps: Vec<S>,
current: Vec<MergedCurVal<I>>,
ixs: Vec<usize>,
errored: bool,
completed: bool,
batch: I,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: usize,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
impl<S, I> MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
{
pub fn new(inps: Vec<S>) -> Self {
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
inps,
current: current,
ixs: vec![0; n],
errored: false,
completed: false,
batch: I::empty(),
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 64,
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
}
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
let mut pending = 0;
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedCurVal::None => {
'l1: loop {
break match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => match k {
StreamItem::Log(item) => {
self.logitems.push_back(item);
continue 'l1;
}
StreamItem::Stats(item) => {
match item {
StatsItem::EventDataReadStats(item) => {
self.event_data_read_stats_items.push_back(item);
}
}
continue 'l1;
}
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
debug!("MergedStream range_complete d {} COMPLETE", d);
} else {
trace!("MergedStream range_complete d {}", d);
}
continue 'l1;
}
RangeCompletableItem::Data(item) => {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::Val(item);
}
},
},
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
self.errored = true;
return Ready(Err(e));
}
Ready(None) => {
self.current[i1] = MergedCurVal::Finish;
}
Pending => {
pending += 1;
}
};
}
}
_ => (),
}
}
if pending > 0 {
Pending
} else {
Ready(Ok(()))
}
}
}
impl<S, I> Stream for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
{
type Item = Sitemty<I>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if let Some(item) = self.logitems.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(item) = self.event_data_read_stats_items.pop_front() {
Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))))
} else if self.data_emit_complete {
if self.range_complete_observed_all {
if self.range_complete_observed_all_emitted {
self.completed = true;
Ready(None)
} else {
self.range_complete_observed_all_emitted = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
} else {
self.completed = true;
Ready(None)
}
} else {
// Can only run logic if all streams are either finished, errored or have some current value.
match self.replenish(cx) {
Ready(Ok(_)) => {
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
if let MergedCurVal::Val(val) = &self.current[i1] {
let u = self.ixs[i1];
if u >= val.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedCurVal::None;
continue 'outer;
} else {
let ts = val.ts(u);
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
}
if lowest_ix == usize::MAX {
if self.batch.len() != 0 {
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.data_emit_complete = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.data_emit_complete = true;
continue 'outer;
}
} else {
assert!(lowest_ts >= self.ts_last_emit);
let emp = I::empty();
let mut local_batch = std::mem::replace(&mut self.batch, emp);
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
local_batch.push_index(val, rix);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.batch = local_batch;
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
MergedCurVal::Val(val) => val.len(),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
};
if self.ixs[lowest_ix] >= curlen {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.len() >= self.batch_size {
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;
}
}
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
};
}
}
}
impl<S, I> HasSeenBeforeRangeCount for MergedBlobsStream<S, I>
where
S: Stream<Item = Sitemty<I>> + Unpin,
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen,
{
fn seen_before_range_count(&self) -> usize {
// TODO (only for debug)
0
}
}

View File

@@ -1,10 +1,12 @@
use err::Error;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::MS;
use netpod::{ChannelConfig, Nanos, Node};
use std::path::PathBuf;
// TODO remove this
pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
@@ -14,6 +16,56 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> Pa
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS))
}
/**
Return potential datafile paths for the given timebin.
It says "potential datafile paths" because we don't open the file here yet and of course,
files may vanish until then. Also, the timebin may actually not exist.
*/
pub async fn datapaths_for_timebin(
timebin: u64,
config: &netpod::ChannelConfig,
node: &Node,
) -> Result<Vec<PathBuf>, Error> {
let timebin_path = node
.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin));
let rd = tokio::fs::read_dir(timebin_path).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
let mut splits = vec![];
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|s| Error::with_msg(format!("Bad OS path {:?}", s)))?;
if dn.len() != 10 {
return Err(Error::with_msg(format!("bad split dirname {:?}", e)));
}
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 10 {
splits.push(dn.parse::<u64>()?);
}
}
let mut ret = vec![];
for split in splits {
let path = node
.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))
.join(format!("{:010}", split))
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS));
ret.push(path);
}
info!("\n\ndatapaths_for_timebin returns: {:?}\n", ret);
Ok(ret)
}
pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
let ret = node
.data_base_path

View File

@@ -4,6 +4,7 @@ use err::Error;
use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use itertools::Itertools;
use netpod::{log::*, NodeConfigCached, APP_OCTET};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
@@ -461,3 +462,25 @@ pub async fn proxy_distribute_v1(req: Request<Body>) -> Result<Response<Body>, E
});
Ok(res)
}
pub async fn api1_binary_events(req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("api1_binary_events headers: {:?}", req.headers());
let accept_def = "";
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_OCTET {
// Ok(plain_events_binary(req, node_config).await.map_err(|e| {
// error!("{:?}", e);
// e
// })?)
let e = Error::with_msg(format!("unexpected Accept: {:?}", accept));
error!("{:?}", e);
Err(e)
} else {
let e = Error::with_msg(format!("unexpected Accept: {:?}", accept));
error!("{:?}", e);
Err(e)
}
}

View File

@@ -240,6 +240,12 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/1/query" {
if req.method() == Method::POST {
Ok(api1::api1_binary_events(req, &node_config).await?)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
}
} else if path.starts_with("/api/1/documentation/") {
if req.method() == Method::GET {
api_1_docs(path)