Move some types to crate streams

This commit is contained in:
Dominik Werder
2022-11-15 11:07:40 +01:00
parent 20e2c20697
commit fb78f1887e
37 changed files with 1002 additions and 954 deletions

View File

@@ -1,7 +1,7 @@
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape};
use streams::eventchunker::EventChunkerConf;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};

View File

@@ -1,7 +1,6 @@
use crate::agg::binnedt::TBinnerStream;
use crate::binned::query::PreBinnedQuery;
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
@@ -19,6 +18,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
use url::Url;
pub struct FetchedPreBinned<TBT> {

View File

@@ -1,9 +1,9 @@
use crate::agg::enp::Identity;
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::eventsitem::EventsItem;
use items::numops::{BoolNum, NumOps, StringNum};
use items::plainevents::{PlainEvents, ScalarPlainEvents};

View File

@@ -19,24 +19,25 @@ pub mod read3;
pub mod read4;
pub mod streamlog;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use netpod::histo::HistoLog2;
use netpod::{log::*, ReadSys};
use futures_util::{FutureExt, TryFutureExt};
use netpod::log::*;
use netpod::ReadSys;
use netpod::{ChannelConfig, DiskIoTune, Node, Shape};
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
use std::mem;
use std::os::unix::prelude::AsRawFd;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{fmt, mem};
use std::time::Instant;
use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use streams::filechunkread::FileChunkRead;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf};
use tokio::sync::mpsc;
@@ -150,27 +151,6 @@ impl FusedFuture for Fopen1 {
unsafe impl Send for Fopen1 {}
pub struct FileChunkRead {
buf: BytesMut,
duration: Duration,
}
impl FileChunkRead {
pub fn into_buf(self) -> BytesMut {
self.buf
}
}
impl fmt::Debug for FileChunkRead {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FileChunkRead")
.field("buf.len", &self.buf.len())
.field("buf.cap", &self.buf.capacity())
.field("duration", &self.duration)
.finish()
}
}
pub struct FileContentStream {
file: File,
disk_io_tune: DiskIoTune,
@@ -229,17 +209,11 @@ impl Stream for FileContentStream {
self.read_going = false;
let ts2 = Instant::now();
if nread == 0 {
let ret = FileChunkRead {
buf,
duration: ts2.duration_since(self.ts1),
};
let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1));
self.done = true;
Ready(Some(Ok(ret)))
} else {
let ret = FileChunkRead {
buf,
duration: ts2.duration_since(self.ts1),
};
let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1));
if false && self.nlog < 6 {
self.nlog += 1;
info!("{:?} ret {:?}", self.disk_io_tune, ret);
@@ -316,12 +290,8 @@ impl Stream for FileContentStream2 {
}
FCS2::Reading((ref mut buf, ref mut fut)) => match fut.poll_unpin(cx) {
Ready(Ok(n)) => {
let mut buf2 = BytesMut::new();
std::mem::swap(buf as &mut BytesMut, &mut buf2);
let item = FileChunkRead {
buf: buf2,
duration: Duration::from_millis(0),
};
let buf2 = std::mem::replace(buf as &mut BytesMut, BytesMut::new());
let item = FileChunkRead::with_buf(buf2);
if n == 0 {
self.done = true;
} else {
@@ -429,17 +399,11 @@ impl Stream for FileContentStream3 {
FCS3::ReadingSimple => match self.read_fut.poll_unpin(cx) {
Ready(Ok(res)) => {
if res.eof {
let item = FileChunkRead {
buf: res.buf,
duration: Duration::from_millis(0),
};
let item = FileChunkRead::with_buf(res.buf);
self.done = true;
Ready(Some(Ok(item)))
} else {
let item = FileChunkRead {
buf: res.buf,
duration: Duration::from_millis(0),
};
let item = FileChunkRead::with_buf(res.buf);
let fd = self.file.as_raw_fd();
let count = self.disk_io_tune.read_buffer_len as u64;
self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count));
@@ -489,10 +453,7 @@ impl Stream for FileContentStream3 {
self.eof = true;
}
}
let res = FileChunkRead {
buf: rr.buf,
duration: Duration::from_millis(0),
};
let res = FileChunkRead::with_buf(rr.buf);
Ready(Some(Ok(res)))
}
Err(e) => {
@@ -619,10 +580,7 @@ impl Stream for FileContentStream4 {
>(rm)
};
self.recv_fut = Box::pin(rm.recv()) as _;
let item = FileChunkRead {
buf: k.buf,
duration: Duration::from_millis(0),
};
let item = FileChunkRead::with_buf(k.buf);
Ready(Some(Ok(item)))
}
Err(e) => {
@@ -669,110 +627,6 @@ pub fn file_content_stream(
}
}
pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
need_min: u32,
left: Option<FileChunkRead>,
buf_len_histo: HistoLog2,
errored: bool,
completed: bool,
}
impl NeedMinBuffer {
pub fn new(inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>) -> Self {
Self {
inp: inp,
need_min: 1,
left: None,
buf_len_histo: HistoLog2::new(8),
errored: false,
completed: false,
}
}
pub fn put_back(&mut self, buf: FileChunkRead) {
assert!(self.left.is_none());
self.left = Some(buf);
}
pub fn set_need_min(&mut self, need_min: u32) {
self.need_min = need_min;
}
}
// TODO collect somewhere else
impl Drop for NeedMinBuffer {
fn drop(&mut self) {
debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo);
}
}
impl Stream for NeedMinBuffer {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.completed {
panic!("NeedMinBuffer poll_next on completed");
} else if self.errored {
self.completed = true;
return Ready(None);
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(fcr))) => {
self.buf_len_histo.ingest(fcr.buf.len() as u32);
//info!("NeedMinBuffer got buf len {}", fcr.buf.len());
match self.left.take() {
Some(mut lfcr) => {
// TODO measure:
lfcr.buf.unsplit(fcr.buf);
lfcr.duration += fcr.duration;
let fcr = lfcr;
if fcr.buf.len() as u32 >= self.need_min {
//info!("with left ready len {} need_min {}", buf.len(), self.need_min);
Ready(Some(Ok(fcr)))
} else {
//info!("with left not enough len {} need_min {}", buf.len(), self.need_min);
self.left.replace(fcr);
continue;
}
}
None => {
if fcr.buf.len() as u32 >= self.need_min {
//info!("simply ready len {} need_min {}", buf.len(), self.need_min);
Ready(Some(Ok(fcr)))
} else {
//info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min);
self.left.replace(fcr);
continue;
}
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
Ready(None) => {
// TODO collect somewhere
debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
Ready(None)
}
Pending => Pending,
}
};
}
}
}
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;
pub const BIG_ENDIAN: u8 = 0x20;
pub const SHAPE: u8 = 0x10;
}
trait ChannelConfigExt {
fn dtflags(&self) -> u8;
}

View File

@@ -1,15 +1,16 @@
use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::merge::MergedStream;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node};
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::eventchunker::{EventChunker, EventChunkerConf};
use streams::rangefilter::RangeFilter;
pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
@@ -240,13 +241,15 @@ impl Stream for EventChunkerMultifile {
#[cfg(test)]
mod test {
use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{log::*, DiskIoTune};
use netpod::DiskIoTune;
use netpod::{ByteSize, ChannelConfig, Nanos};
use streams::eventchunker::EventChunkerConf;
use streams::rangefilter::RangeFilter;
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {

View File

@@ -1,756 +1 @@
use crate::{FileChunkRead, NeedMinBuffer};
use bitshuffle::bitshuffle_decompress;
use bytes::{Buf, BytesMut};
use err::Error;
use futures_util::{Stream, StreamExt};
use items::{
Appendable, ByteEstimate, Clearable, FrameTypeInnerStatic, PushableIndex, RangeCompletableItem, StatsItem,
StreamItem, WithLen, WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
use parse::channelconfig::CompressionMethod;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
pub struct EventChunker {
inp: NeedMinBuffer,
state: DataFileState,
need_min: u32,
channel_config: ChannelConfig,
errored: bool,
completed: bool,
range: NanoRange,
stats_conf: EventChunkerConf,
seen_beyond_range: bool,
sent_beyond_range: bool,
data_emit_complete: bool,
final_stats_sent: bool,
parsed_bytes: u64,
dbg_path: PathBuf,
max_ts: u64,
expand: bool,
do_decompress: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
}
impl Drop for EventChunker {
fn drop(&mut self) {
// TODO collect somewhere
debug!(
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
self.decomp_dt_histo, self.item_len_emit_histo
);
}
}
enum DataFileState {
FileHeader,
Event,
}
struct ParseResult {
events: EventFull,
parsed_bytes: u64,
}
#[derive(Clone, Debug)]
pub struct EventChunkerConf {
pub disk_stats_every: ByteSize,
}
impl EventChunkerConf {
pub fn new(disk_stats_every: ByteSize) -> Self {
Self { disk_stats_every }
}
}
impl EventChunker {
// TODO `expand` flag usage
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
trace!("EventChunker::from_start");
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
inp,
state: DataFileState::FileHeader,
need_min: 6,
channel_config,
errored: false,
completed: false,
range,
stats_conf,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
parsed_bytes: 0,
dbg_path,
max_ts: 0,
expand,
do_decompress,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
}
}
// TODO `expand` flag usage
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
ret
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
}
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
let mut ret = EventFull::empty();
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
if (buf.len() as u32) < self.need_min {
break;
}
match self.state {
DataFileState::FileHeader => {
if buf.len() < 6 {
Err(Error::with_msg("need min 6 for FileHeader"))?;
}
let mut sl = std::io::Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
if fver != 0 {
Err(Error::with_msg("unexpected data file version"))?;
}
let len = sl.read_i32::<BE>().unwrap();
if len <= 0 || len >= 128 {
Err(Error::with_msg("large channel header len"))?;
}
let totlen = len as usize + 2;
if buf.len() < totlen {
self.need_min = totlen as u32;
break;
} else {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
if len != len2 {
Err(Error::with_msg("channel header len mismatch"))?;
}
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?;
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
parsed_bytes += totlen as u64;
}
}
DataFileState::Event => {
let p0 = 0;
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
if len < 20 || len > 1024 * 1024 * 20 {
Err(Error::with_msg("unexpected large event chunk"))?;
}
let len = len as u32;
if (buf.len() as u32) < len {
self.need_min = len as u32;
break;
} else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b as u32);
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
let msg = format!(
"EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.repeated_ts_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.repeated_ts_warn_count += 1;
}
}
if ts < self.max_ts {
if self.unordered_warn_count < 20 {
let msg = format!(
"EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.unordered_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.unordered_warn_count += 1;
let e = Error::with_public_msg_no_trace(msg);
return Err(e);
}
}
self.max_ts = ts;
if ts >= self.range.end {
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
}
}
if ts < self.range.beg {
self.seen_before_range_count += 1;
if self.seen_before_range_count > 1 {
let msg = format!(
"seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
let e = Error::with_public_msg(msg);
Err(e)?;
}
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap();
let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap();
if status != 0 {
Err(Error::with_msg(format!("status != 0: {}", status)))?;
}
if severity != 0 {
Err(Error::with_msg(format!("severity != 0: {}", severity)))?;
}
if optional != -1 {
Err(Error::with_msg(format!("optional != -1: {}", optional)))?;
}
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
if type_index > 13 {
Err(Error::with_msg(format!("type_index: {}", type_index)))?;
}
let scalar_type = ScalarType::from_dtype_index(type_index)?;
use super::dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?;
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };
assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0];
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
}
let shape_this = {
if is_shaped {
if shape_dim == 1 {
Shape::Wave(shape_lens[0])
} else if shape_dim == 2 {
Shape::Image(shape_lens[0], shape_lens[1])
} else {
err::todoval()
}
} else {
Shape::Scalar
}
};
let comp_this = if is_compressed {
if compression_method == 0 {
Some(CompressionMethod::BitshuffleLZ4)
} else {
err::todoval()
}
} else {
None
};
let p1 = sl.position();
let k1 = len as u64 - (p1 - p0) - 4;
if is_compressed {
//debug!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
match self.channel_config.shape {
Shape::Scalar => {
assert!(value_bytes < 1024 * 1);
}
Shape::Wave(_) => {
assert!(value_bytes < 1024 * 64);
}
Shape::Image(_, _) => {
assert!(value_bytes < 1024 * 1024 * 20);
}
}
assert!(block_size <= 1024 * 32);
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
Shape::Scalar => {
if is_array {
Err(Error::with_msg(format!(
"ChannelConfig expects Scalar but we find event is_array"
)))?;
}
}
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
Shape::Image(n1, n2) => {
let nt = n1 as usize * n2 as usize;
if nt != ele_count as usize {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
}
let decomp = {
if self.do_decompress {
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
// TODO limit the buf slice range
match bitshuffle_decompress(
&buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Some(decomp)
}
Err(e) => {
return Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
}
} else {
None
}
};
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
decomp,
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
} else {
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
Err(Error::with_msg(msg))?;
}
let vlen = len - p1 as u32 - 4;
// TODO in this case, decomp and comp is the same and not needed.
let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]);
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
buf.advance(len as usize);
parsed_bytes += len as u64;
self.need_min = 4;
}
}
}
}
Ok(ParseResult {
events: ret,
parsed_bytes,
})
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventFull {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,
pub blobs: Vec<Vec<u8>>,
#[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")]
// TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp.
pub decomps: Vec<Option<BytesMut>>,
pub scalar_types: Vec<ScalarType>,
pub be: Vec<bool>,
pub shapes: Vec<Shape>,
pub comps: Vec<Option<CompressionMethod>>,
}
fn decomps_ser<S>(t: &Vec<Option<BytesMut>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let a: Vec<_> = t
.iter()
.map(|k| match k {
None => None,
Some(j) => Some(j[..].to_vec()),
})
.collect();
Serialize::serialize(&a, s)
}
fn decomps_de<'de, D>(d: D) -> Result<Vec<Option<BytesMut>>, D::Error>
where
D: Deserializer<'de>,
{
let a: Vec<Option<Vec<u8>>> = Deserialize::deserialize(d)?;
let a = a
.iter()
.map(|k| match k {
None => None,
Some(j) => {
let mut a = BytesMut::new();
a.extend_from_slice(&j);
Some(a)
}
})
.collect();
Ok(a)
}
impl EventFull {
pub fn empty() -> Self {
Self {
tss: vec![],
pulses: vec![],
blobs: vec![],
decomps: vec![],
scalar_types: vec![],
be: vec![],
shapes: vec![],
comps: vec![],
}
}
fn add_event(
&mut self,
ts: u64,
pulse: u64,
blob: Vec<u8>,
decomp: Option<BytesMut>,
scalar_type: ScalarType,
be: bool,
shape: Shape,
comp: Option<CompressionMethod>,
) {
self.tss.push(ts);
self.pulses.push(pulse);
self.blobs.push(blob);
self.decomps.push(decomp);
self.scalar_types.push(scalar_type);
self.be.push(be);
self.shapes.push(shape);
self.comps.push(comp);
}
pub fn decomp(&self, i: usize) -> &[u8] {
match &self.decomps[i] {
Some(decomp) => &decomp,
None => &self.blobs[i],
}
}
}
impl FrameTypeInnerStatic for EventFull {
const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID;
}
impl WithLen for EventFull {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Appendable for EventFull {
fn empty_like_self(&self) -> Self {
Self::empty()
}
// TODO expensive, get rid of it.
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.pulses.extend_from_slice(&src.pulses);
self.blobs.extend_from_slice(&src.blobs);
self.decomps.extend_from_slice(&src.decomps);
self.scalar_types.extend_from_slice(&src.scalar_types);
self.be.extend_from_slice(&src.be);
self.shapes.extend_from_slice(&src.shapes);
self.comps.extend_from_slice(&src.comps);
}
fn append_zero(&mut self, _ts1: u64, _ts2: u64) {
// TODO do we still need this type?
todo!()
}
}
impl Clearable for EventFull {
fn clear(&mut self) {
self.tss.clear();
self.pulses.clear();
self.blobs.clear();
self.decomps.clear();
self.scalar_types.clear();
self.be.clear();
self.shapes.clear();
self.comps.clear();
}
}
impl WithTimestamps for EventFull {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl ByteEstimate for EventFull {
fn byte_estimate(&self) -> u64 {
if self.tss.len() == 0 {
0
} else {
// TODO that is clumsy... it assumes homogenous types.
// TODO improve via a const fn on NTY
let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len());
self.tss.len() as u64 * (40 + self.blobs[0].len() as u64 + decomp_len as u64)
}
}
}
impl PushableIndex for EventFull {
// TODO check all use cases, can't we move?
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.pulses.push(src.pulses[ix]);
self.blobs.push(src.blobs[ix].clone());
self.decomps.push(src.decomps[ix].clone());
self.scalar_types.push(src.scalar_types[ix].clone());
self.be.push(src.be[ix]);
self.shapes.push(src.shapes[ix].clone());
self.comps.push(src.comps[ix].clone());
}
}
impl Stream for EventChunker {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("EventChunker poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
Ready(Some(Ok(ret)))
} else if self.sent_beyond_range {
self.completed = true;
Ready(None)
} else if self.final_stats_sent {
self.sent_beyond_range = true;
trace!("sent_beyond_range");
if self.seen_beyond_range {
trace!("sent_beyond_range RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
trace!("sent_beyond_range non-complete");
continue 'outer;
}
} else if self.data_emit_complete {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
self.final_stats_sent = true;
Ready(Some(Ok(ret)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
if false {
// TODO collect for stats:
info!("file read bytes {} ms {}", fcr.buf.len(), fcr.duration.as_millis());
}
let r = self.parse_buf(&mut fcr.buf);
match r {
Ok(res) => {
self.parsed_bytes += res.parsed_bytes;
if fcr.buf.len() > 0 {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
match self.channel_config.shape {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Wave(_) => {
if self.need_min > 1024 * 32 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Image(_, _) => {
if self.need_min > 1024 * 1024 * 20 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
}
let x = self.need_min;
self.inp.set_need_min(x);
if false {
info!(
"EventChunker emits {} events tss {:?}",
res.events.len(),
res.events.tss
);
};
self.item_len_emit_histo.ingest(res.events.len() as u32);
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.data_emit_complete = true;
continue 'outer;
}
Pending => Pending,
}
};
}
}
}
#[cfg(test)]
mod test {
//use err::Error;
//use netpod::timeunits::*;
//use netpod::{ByteSize, Nanos};
/*
#[test]
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
}
*/
}

View File

@@ -1,2 +1 @@
pub mod inmem;
pub mod makeframe;

View File

@@ -1,287 +0,0 @@
use bytes::{BufMut, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::pin_mut;
use items::inmem::InMemoryFrame;
use items::StreamItem;
use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use netpod::log::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
/**
Interprets a byte stream as length-delimited frames.
Emits each frame as a single item. Therefore, each item must fit easily into memory.
*/
pub struct InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
inp: T,
buf: BytesMut,
bufcap: usize,
wp: usize,
tryparse: bool,
errored: bool,
completed: bool,
inp_bytes_consumed: u64,
}
impl<T> InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
pub fn new(inp: T, bufcap: usize) -> Self {
let mut t = Self {
inp,
buf: BytesMut::new(),
bufcap,
wp: 0,
tryparse: false,
errored: false,
completed: false,
inp_bytes_consumed: 0,
};
t.buf = t.empty_buf();
t
}
fn empty_buf(&self) -> BytesMut {
let mut buf = BytesMut::with_capacity(self.bufcap);
buf.resize(buf.capacity(), 0);
buf
}
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
if true || self.wp > 0 {
let mut bnew = self.empty_buf();
assert!(self.buf.len() >= self.wp);
assert!(bnew.capacity() >= self.wp);
trace!(
"InMemoryFrameAsyncReadStream re-use {} bytes from previous i/o",
self.wp,
);
bnew[..].as_mut().put_slice(&self.buf[..self.wp]);
self.buf = bnew;
}
trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),);
let gg = self.buf.len() - self.wp;
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
if gg < 1 || gg > 1024 * 1024 * 40 {
use bytes::Buf;
panic!(
"have gg {} len {} cap {} rem {} rem mut {} self.wp {}",
gg,
self.buf.len(),
self.buf.capacity(),
self.buf.remaining(),
self.buf.remaining_mut(),
self.wp,
);
}
assert!(buf2.remaining() == gg);
assert!(buf2.capacity() == gg);
assert!(buf2.filled().len() == 0);
let j = &mut self.inp;
pin_mut!(j);
use Poll::*;
match AsyncRead::poll_read(j, cx, &mut buf2) {
Ready(Ok(_)) => {
let n1 = buf2.filled().len();
trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n1);
Ready(Ok(n1))
}
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
}
}
fn tryparse(
&mut self,
buf: BytesMut,
wp: usize,
) -> (Option<Option<Result<InMemoryFrame, Error>>>, BytesMut, usize) {
let nb = wp;
if nb >= INMEM_FRAME_HEAD {
let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]);
let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]);
let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]);
let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]);
if magic != INMEM_FRAME_MAGIC {
let z = nb.min(256);
let u = String::from_utf8_lossy(&buf[0..z]);
let e = Error::with_msg("INCORRECT MAGIC");
error!("incorrect magic buf as utf8: {:?} error: {:?}", u, e);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}",
magic, u
);
error!("{}", msg);
return (Some(Some(Err(Error::with_msg(format!("{}", msg))))), buf, wp);
}
if len == 0 {
if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT {
warn!("stop-frame with nb {}", nb);
}
(Some(None), buf, wp)
} else {
if len > 1024 * 1024 * 50 {
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
);
error!("{}", msg);
return (Some(Some(Err(Error::with_msg(msg)))), buf, wp);
} else if len > 1024 * 1024 * 1 {
// TODO
//warn!("InMemoryFrameAsyncReadStream big len received {}", len);
}
let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT;
if self.bufcap < nl {
// TODO count cases in production
let n = 2 * nl;
debug!("Adjust bufcap old {} new {}", self.bufcap, n);
self.bufcap = n;
}
if nb < nl {
(None, buf, wp)
} else {
use bytes::Buf;
let mut h = crc32fast::Hasher::new();
h.update(&buf[..(nl - INMEM_FRAME_FOOT)]);
let frame_crc = h.finalize();
let mut h = crc32fast::Hasher::new();
h.update(&buf[INMEM_FRAME_HEAD..(nl - INMEM_FRAME_FOOT)]);
let payload_crc = h.finalize();
let frame_crc_ind =
u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]);
let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]);
//info!("len {}", len);
//info!("payload_crc_ind {}", payload_crc_ind);
//info!("frame_crc_ind {}", frame_crc_ind);
let payload_crc_match = payload_crc_ind == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !frame_crc_match || !payload_crc_match {
let ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
warn!("CRC mismatch A frame_crc_match {frame_crc_match} payload_crc_match {payload_crc_match}\n{ss:?}");
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
payload_crc_match, frame_crc_match,
))))),
buf,
wp,
);
}
let mut buf = buf;
let mut buf3 = buf.split_to(nl);
let buf = buf;
buf3.advance(INMEM_FRAME_HEAD);
buf3.truncate(len as usize);
let mut h = crc32fast::Hasher::new();
h.update(&buf3);
let payload_crc_2 = h.finalize();
let payload_crc_2_match = payload_crc_2 == payload_crc_ind;
if !payload_crc_2_match {
let sa = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
let sb = String::from_utf8_lossy(&buf3[..buf3.len().min(256)]);
warn!("CRC mismatch B\n{sa:?}\n{sb:?}");
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch B {} {} {}",
payload_crc_match, frame_crc_match, payload_crc_2_match,
))))),
buf,
wp,
);
}
self.inp_bytes_consumed += nl as u64;
let ret = InMemoryFrame {
len,
tyid,
encid,
buf: buf3.freeze(),
};
(Some(Some(Ok(ret))), buf, wp - nl)
}
}
} else {
(None, buf, wp)
}
}
}
impl<T> Stream for InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
type Item = Result<StreamItem<InMemoryFrame>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
assert!(!self.completed);
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
if self.tryparse {
let r = {
let buf = std::mem::replace(&mut self.buf, BytesMut::new());
let wp = self.wp;
let (r, buf, wp) = self.tryparse(buf, wp);
self.buf = buf;
self.wp = wp;
r
};
break match r {
None => {
self.tryparse = false;
continue 'outer;
}
Some(None) => {
self.tryparse = false;
self.completed = true;
Ready(None)
}
Some(Some(Ok(item))) => Ready(Some(Ok(StreamItem::DataItem(item)))),
Some(Some(Err(e))) => {
self.tryparse = false;
self.errored = true;
Ready(Some(Err(e)))
}
};
} else {
let r = self.poll_upstream(cx);
break match r {
Ready(Ok(n1)) => {
self.wp += n1;
if n1 == 0 {
let n2 = self.buf.len();
if n2 != 0 {
// TODO anything more to handle here?
debug!(
"InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}",
n2, self.inp_bytes_consumed
);
}
self.completed = true;
Ready(None)
} else {
self.tryparse = true;
continue 'outer;
}
}
Ready(Err(e)) => {
trace!("poll_upstream GIVES Error");
self.errored = true;
Ready(Some(Err(e.into())))
}
Pending => Pending,
};
}
}
}
}

View File

@@ -428,7 +428,7 @@ async fn gen_event(
buf.put_u8(0);
buf.put_u8(0);
buf.put_i32(-1);
use crate::dtflags::*;
use streams::dtflags::*;
if config.compression {
match config.shape {
Shape::Wave(ele_count) => {

View File

@@ -299,7 +299,6 @@ where
#[cfg(test)]
mod test {
use crate::dataopen::position_file_for_test;
use crate::eventchunker::{EventChunker, EventChunkerConf};
use crate::file_content_stream;
use crate::merge::MergedStream;
use err::Error;
@@ -310,6 +309,8 @@ mod test {
use netpod::timeunits::{DAY, MS};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
use std::path::PathBuf;
use streams::eventchunker::EventChunker;
use streams::eventchunker::EventChunkerConf;
fn scalar_file_path() -> PathBuf {
test_data_base_path_databuffer()

View File

@@ -1,9 +1,8 @@
use crate::eventchunker::EventFull;
use crate::merge::MergedStream;
use crate::raw::client::x_processed_event_blobs_stream_from_node;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use items::eventfull::EventFull;
use items::Sitemty;
use netpod::log::*;
use netpod::query::RawEventsQuery;
@@ -11,6 +10,7 @@ use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::tcprawclient::x_processed_event_blobs_stream_from_node;
type T001<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
type T002<T> = Pin<Box<dyn Future<Output = Result<T001<T>, Error>> + Send>>;

View File

@@ -1,5 +1,4 @@
use crate::merge::MergedStream;
use crate::raw::client::x_processed_stream_from_node;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
@@ -10,6 +9,7 @@ use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::tcprawclient::x_processed_stream_from_node;
type T001<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
type T002<T> = Pin<Box<dyn Future<Output = Result<T001<T>, Error>> + Send>>;

View File

@@ -1,3 +1 @@
pub mod client;
pub mod conn;
pub mod eventsfromframes;

View File

@@ -1,73 +0,0 @@
/*!
Delivers event data.
Delivers event data (not yet time-binned) from local storage and provides client functions
to request such data from nodes.
*/
use crate::eventchunker::EventFull;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::raw::eventsfromframes::EventsFromFrames;
use err::Error;
use futures_core::Stream;
use items::frame::{make_frame, make_term_frame};
use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{Node, PerfOpts};
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub async fn x_processed_stream_from_node<ENP>(
query: RawEventsQuery,
perf_opts: PerfOpts,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Send>>, Error>
where
ENP: EventsNodeProcessor,
<ENP as EventsNodeProcessor>::Output: Unpin + 'static,
{
debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
EventQueryJsonStringFrame(qjs),
)));
let buf = make_frame(&item)?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let items = EventsFromFrames::new(frames);
Ok(Box::pin(items))
}
pub async fn x_processed_event_blobs_stream_from_node(
query: RawEventsQuery,
perf_opts: PerfOpts,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
debug!(
"x_processed_event_blobs_stream_from_node to: {}:{}",
node.host, node.port_raw
);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
EventQueryJsonStringFrame(qjs),
)));
let buf = make_frame(&item)?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let items = EventsFromFrames::new(frames);
Ok(Box::pin(items))
}

View File

@@ -2,9 +2,9 @@ use crate::decode::{BigEndian, Endianness, LittleEndian};
use crate::decode::{EventValueFromBytes, EventValueShape, EventsDecodedStream, NumFromBytes};
use crate::decode::{EventValuesDim0Case, EventValuesDim1Case};
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::{EventChunkerConf, EventFull};
use err::Error;
use futures_util::{Stream, StreamExt};
use items::eventfull::EventFull;
use items::numops::{BoolNum, NumOps, StringNum};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
@@ -12,6 +12,7 @@ use netpod::query::RawEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
use std::pin::Pin;
use streams::eventchunker::EventChunkerConf;
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
event_value_shape: EVS,

View File

@@ -1,85 +0,0 @@
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use futures_core::Stream;
use futures_util::StreamExt;
use items::frame::decode_frame;
use items::{FrameTypeInnerStatic, Sitemty, StreamItem};
use netpod::log::*;
use serde::de::DeserializeOwned;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
pub struct EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
{
inp: InMemoryFrameAsyncReadStream<T>,
errored: bool,
completed: bool,
_m1: PhantomData<I>,
}
impl<T, I> EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
{
pub fn new(inp: InMemoryFrameAsyncReadStream<T>) -> Self {
Self {
inp,
errored: false,
completed: false,
_m1: PhantomData,
}
}
}
impl<T, I> Stream for EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
I: FrameTypeInnerStatic + DeserializeOwned + Unpin,
{
type Item = Sitemty<I>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.completed {
panic!("poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<I>>(&frame) {
Ok(item) => match item {
Ok(item) => Ready(Some(Ok(item))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
Err(e) => {
error!("frame payload len {} tyid {} {}", frame.buf().len(), frame.tyid(), e);
self.errored = true;
Ready(Some(Err(e)))
}
},
},
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending,
}
};
}
}
}