From fb78f1887e3d0a314e3cb6ed2ce6e398485d5f1c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 15 Nov 2022 11:07:40 +0100 Subject: [PATCH] Move some types to crate streams --- daqbufp2/Cargo.toml | 1 + daqbufp2/src/client.rs | 2 +- daqbufp2/src/test/binnedbinary.rs | 2 +- daqbufp2/src/test/events.rs | 2 +- disk/Cargo.toml | 4 +- disk/src/aggtest.rs | 2 +- disk/src/binned/binnedfrompbv.rs | 2 +- disk/src/decode.rs | 2 +- disk/src/disk.rs | 176 +--- disk/src/eventblobs.rs | 9 +- disk/src/eventchunker.rs | 755 ------------------ disk/src/frame.rs | 1 - disk/src/gen.rs | 2 +- disk/src/merge.rs | 3 +- disk/src/merge/mergedblobsfromremotes.rs | 4 +- disk/src/merge/mergedfromremotes.rs | 2 +- disk/src/raw.rs | 2 - disk/src/raw/conn.rs | 3 +- dq/Cargo.toml | 1 + dq/src/bin/dq.rs | 8 +- httpret/Cargo.toml | 1 + httpret/src/api1.rs | 9 +- items/Cargo.toml | 1 + items/src/eventfull.rs | 175 ++++ items/src/items.rs | 1 + nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 8 +- streams/Cargo.toml | 6 + streams/src/dtflags.rs | 4 + streams/src/eventchunker.rs | 592 ++++++++++++++ streams/src/filechunkread.rs | 55 ++ streams/src/frames.rs | 2 + .../src/frames}/eventsfromframes.rs | 2 +- .../src/frame => streams/src/frames}/inmem.rs | 0 streams/src/lib.rs | 6 + streams/src/needminbuffer.rs | 104 +++ .../client.rs => streams/src/tcprawclient.rs | 6 +- 37 files changed, 1002 insertions(+), 954 deletions(-) create mode 100644 items/src/eventfull.rs create mode 100644 streams/src/dtflags.rs create mode 100644 streams/src/eventchunker.rs create mode 100644 streams/src/filechunkread.rs create mode 100644 streams/src/frames.rs rename {disk/src/raw => streams/src/frames}/eventsfromframes.rs (98%) rename {disk/src/frame => streams/src/frames}/inmem.rs (100%) create mode 100644 streams/src/needminbuffer.rs rename disk/src/raw/client.rs => streams/src/tcprawclient.rs (94%) diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index f0b65b1..f6b40ae 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -29,3 +29,4 @@ netpod = { path = "../netpod" } httpret = { path = "../httpret" } disk = { path = "../disk" } items = { path = "../items" } +streams = { path = "../streams" } diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index fd050cb..ca055b5 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -1,6 +1,5 @@ use crate::err::ErrConv; use chrono::{DateTime, Utc}; -use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; use futures_util::TryStreamExt; @@ -11,6 +10,7 @@ use items::{Sitemty, StreamItem}; use netpod::query::{BinnedQuery, CacheUsage}; use netpod::{log::*, AppendToUrl}; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; use url::Url; pub async fn status(host: String, port: u16) -> Result<(), Error> { diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 7f36f8f..f0e9c01 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -1,7 +1,6 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; use futures_util::{StreamExt, TryStreamExt}; @@ -15,6 +14,7 @@ use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET use serde::de::DeserializeOwned; use std::fmt; use std::future::ready; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 934f261..1073279 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -1,7 +1,6 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; use futures_util::{StreamExt, TryStreamExt}; @@ -16,6 +15,7 @@ use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_J use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 34493bb..37f23ab 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -13,13 +13,13 @@ serde_json = "1.0" serde_cbor = "0.11.1" http = "0.2" chrono = { version = "0.4.19", features = ["serde"] } -tokio = { version = "1.11.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tokio-stream = {version = "0.1.5", features = ["fs"]} hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" crossbeam = "0.8" bytes = "1.0.1" -crc32fast = "1.2.1" +crc32fast = "1.3.2" arrayref = "0.3.6" byteorder = "1.4.3" futures-core = "0.3.14" diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 8ab4a03..95579a5 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,7 +1,7 @@ use crate::eventblobs::EventChunkerMultifile; -use crate::eventchunker::EventChunkerConf; use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer}; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; +use streams::eventchunker::EventChunkerConf; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index f81884e..9368882 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,7 +1,6 @@ use crate::agg::binnedt::TBinnerStream; use crate::binned::query::PreBinnedQuery; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; -use crate::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; @@ -19,6 +18,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; use url::Url; pub struct FetchedPreBinned { diff --git a/disk/src/decode.rs b/disk/src/decode.rs index f7e078b..c0876be 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,9 +1,9 @@ use crate::agg::enp::Identity; use crate::eventblobs::EventChunkerMultifile; -use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::eventfull::EventFull; use items::eventsitem::EventsItem; use items::numops::{BoolNum, NumOps, StringNum}; use items::plainevents::{PlainEvents, ScalarPlainEvents}; diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 5c5fcbc..a0b0759 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -19,24 +19,25 @@ pub mod read3; pub mod read4; pub mod streamlog; -use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::{FutureExt, StreamExt, TryFutureExt}; -use netpod::histo::HistoLog2; -use netpod::{log::*, ReadSys}; +use futures_util::{FutureExt, TryFutureExt}; +use netpod::log::*; +use netpod::ReadSys; use netpod::{ChannelConfig, DiskIoTune, Node, Shape}; use std::collections::VecDeque; use std::future::Future; use std::io::SeekFrom; +use std::mem; use std::os::unix::prelude::AsRawFd; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use std::{fmt, mem}; +use std::time::Instant; +use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; +use streams::filechunkread::FileChunkRead; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf}; use tokio::sync::mpsc; @@ -150,27 +151,6 @@ impl FusedFuture for Fopen1 { unsafe impl Send for Fopen1 {} -pub struct FileChunkRead { - buf: BytesMut, - duration: Duration, -} - -impl FileChunkRead { - pub fn into_buf(self) -> BytesMut { - self.buf - } -} - -impl fmt::Debug for FileChunkRead { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("FileChunkRead") - .field("buf.len", &self.buf.len()) - .field("buf.cap", &self.buf.capacity()) - .field("duration", &self.duration) - .finish() - } -} - pub struct FileContentStream { file: File, disk_io_tune: DiskIoTune, @@ -229,17 +209,11 @@ impl Stream for FileContentStream { self.read_going = false; let ts2 = Instant::now(); if nread == 0 { - let ret = FileChunkRead { - buf, - duration: ts2.duration_since(self.ts1), - }; + let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1)); self.done = true; Ready(Some(Ok(ret))) } else { - let ret = FileChunkRead { - buf, - duration: ts2.duration_since(self.ts1), - }; + let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1)); if false && self.nlog < 6 { self.nlog += 1; info!("{:?} ret {:?}", self.disk_io_tune, ret); @@ -316,12 +290,8 @@ impl Stream for FileContentStream2 { } FCS2::Reading((ref mut buf, ref mut fut)) => match fut.poll_unpin(cx) { Ready(Ok(n)) => { - let mut buf2 = BytesMut::new(); - std::mem::swap(buf as &mut BytesMut, &mut buf2); - let item = FileChunkRead { - buf: buf2, - duration: Duration::from_millis(0), - }; + let buf2 = std::mem::replace(buf as &mut BytesMut, BytesMut::new()); + let item = FileChunkRead::with_buf(buf2); if n == 0 { self.done = true; } else { @@ -429,17 +399,11 @@ impl Stream for FileContentStream3 { FCS3::ReadingSimple => match self.read_fut.poll_unpin(cx) { Ready(Ok(res)) => { if res.eof { - let item = FileChunkRead { - buf: res.buf, - duration: Duration::from_millis(0), - }; + let item = FileChunkRead::with_buf(res.buf); self.done = true; Ready(Some(Ok(item))) } else { - let item = FileChunkRead { - buf: res.buf, - duration: Duration::from_millis(0), - }; + let item = FileChunkRead::with_buf(res.buf); let fd = self.file.as_raw_fd(); let count = self.disk_io_tune.read_buffer_len as u64; self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count)); @@ -489,10 +453,7 @@ impl Stream for FileContentStream3 { self.eof = true; } } - let res = FileChunkRead { - buf: rr.buf, - duration: Duration::from_millis(0), - }; + let res = FileChunkRead::with_buf(rr.buf); Ready(Some(Ok(res))) } Err(e) => { @@ -619,10 +580,7 @@ impl Stream for FileContentStream4 { >(rm) }; self.recv_fut = Box::pin(rm.recv()) as _; - let item = FileChunkRead { - buf: k.buf, - duration: Duration::from_millis(0), - }; + let item = FileChunkRead::with_buf(k.buf); Ready(Some(Ok(item))) } Err(e) => { @@ -669,110 +627,6 @@ pub fn file_content_stream( } } -pub struct NeedMinBuffer { - inp: Pin> + Send>>, - need_min: u32, - left: Option, - buf_len_histo: HistoLog2, - errored: bool, - completed: bool, -} - -impl NeedMinBuffer { - pub fn new(inp: Pin> + Send>>) -> Self { - Self { - inp: inp, - need_min: 1, - left: None, - buf_len_histo: HistoLog2::new(8), - errored: false, - completed: false, - } - } - - pub fn put_back(&mut self, buf: FileChunkRead) { - assert!(self.left.is_none()); - self.left = Some(buf); - } - - pub fn set_need_min(&mut self, need_min: u32) { - self.need_min = need_min; - } -} - -// TODO collect somewhere else -impl Drop for NeedMinBuffer { - fn drop(&mut self) { - debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); - } -} - -impl Stream for NeedMinBuffer { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break if self.completed { - panic!("NeedMinBuffer poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(fcr))) => { - self.buf_len_histo.ingest(fcr.buf.len() as u32); - //info!("NeedMinBuffer got buf len {}", fcr.buf.len()); - match self.left.take() { - Some(mut lfcr) => { - // TODO measure: - lfcr.buf.unsplit(fcr.buf); - lfcr.duration += fcr.duration; - let fcr = lfcr; - if fcr.buf.len() as u32 >= self.need_min { - //info!("with left ready len {} need_min {}", buf.len(), self.need_min); - Ready(Some(Ok(fcr))) - } else { - //info!("with left not enough len {} need_min {}", buf.len(), self.need_min); - self.left.replace(fcr); - continue; - } - } - None => { - if fcr.buf.len() as u32 >= self.need_min { - //info!("simply ready len {} need_min {}", buf.len(), self.need_min); - Ready(Some(Ok(fcr))) - } else { - //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); - self.left.replace(fcr); - continue; - } - } - } - } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - // TODO collect somewhere - debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo); - Ready(None) - } - Pending => Pending, - } - }; - } - } -} - -pub mod dtflags { - pub const COMPRESSION: u8 = 0x80; - pub const ARRAY: u8 = 0x40; - pub const BIG_ENDIAN: u8 = 0x20; - pub const SHAPE: u8 = 0x10; -} - trait ChannelConfigExt { fn dtflags(&self) -> u8; } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index b65af39..30cbbed 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,15 +1,16 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; -use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::merge::MergedStream; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::eventfull::EventFull; use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node}; use std::pin::Pin; use std::task::{Context, Poll}; +use streams::eventchunker::{EventChunker, EventChunkerConf}; use streams::rangefilter::RangeFilter; pub trait InputTraits: Stream> {} @@ -240,13 +241,15 @@ impl Stream for EventChunkerMultifile { #[cfg(test)] mod test { - use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; + use crate::eventblobs::EventChunkerMultifile; use err::Error; use futures_util::StreamExt; use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; use netpod::timeunits::{DAY, MS}; - use netpod::{log::*, DiskIoTune}; + use netpod::DiskIoTune; use netpod::{ByteSize, ChannelConfig, Nanos}; + use streams::eventchunker::EventChunkerConf; use streams::rangefilter::RangeFilter; fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 9f8f90f..8b13789 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,756 +1 @@ -use crate::{FileChunkRead, NeedMinBuffer}; -use bitshuffle::bitshuffle_decompress; -use bytes::{Buf, BytesMut}; -use err::Error; -use futures_util::{Stream, StreamExt}; -use items::{ - Appendable, ByteEstimate, Clearable, FrameTypeInnerStatic, PushableIndex, RangeCompletableItem, StatsItem, - StreamItem, WithLen, WithTimestamps, -}; -use netpod::histo::HistoLog2; -use netpod::log::*; -use netpod::timeunits::SEC; -use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; -use parse::channelconfig::CompressionMethod; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::path::PathBuf; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Instant; -pub struct EventChunker { - inp: NeedMinBuffer, - state: DataFileState, - need_min: u32, - channel_config: ChannelConfig, - errored: bool, - completed: bool, - range: NanoRange, - stats_conf: EventChunkerConf, - seen_beyond_range: bool, - sent_beyond_range: bool, - data_emit_complete: bool, - final_stats_sent: bool, - parsed_bytes: u64, - dbg_path: PathBuf, - max_ts: u64, - expand: bool, - do_decompress: bool, - decomp_dt_histo: HistoLog2, - item_len_emit_histo: HistoLog2, - seen_before_range_count: usize, - seen_after_range_count: usize, - unordered_warn_count: usize, - repeated_ts_warn_count: usize, -} - -impl Drop for EventChunker { - fn drop(&mut self) { - // TODO collect somewhere - debug!( - "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", - self.decomp_dt_histo, self.item_len_emit_histo - ); - } -} - -enum DataFileState { - FileHeader, - Event, -} - -struct ParseResult { - events: EventFull, - parsed_bytes: u64, -} - -#[derive(Clone, Debug)] -pub struct EventChunkerConf { - pub disk_stats_every: ByteSize, -} - -impl EventChunkerConf { - pub fn new(disk_stats_every: ByteSize) -> Self { - Self { disk_stats_every } - } -} - -impl EventChunker { - // TODO `expand` flag usage - pub fn from_start( - inp: Pin> + Send>>, - channel_config: ChannelConfig, - range: NanoRange, - stats_conf: EventChunkerConf, - dbg_path: PathBuf, - expand: bool, - do_decompress: bool, - ) -> Self { - trace!("EventChunker::from_start"); - let mut inp = NeedMinBuffer::new(inp); - inp.set_need_min(6); - Self { - inp, - state: DataFileState::FileHeader, - need_min: 6, - channel_config, - errored: false, - completed: false, - range, - stats_conf, - seen_beyond_range: false, - sent_beyond_range: false, - data_emit_complete: false, - final_stats_sent: false, - parsed_bytes: 0, - dbg_path, - max_ts: 0, - expand, - do_decompress, - decomp_dt_histo: HistoLog2::new(8), - item_len_emit_histo: HistoLog2::new(0), - seen_before_range_count: 0, - seen_after_range_count: 0, - unordered_warn_count: 0, - repeated_ts_warn_count: 0, - } - } - - // TODO `expand` flag usage - pub fn from_event_boundary( - inp: Pin> + Send>>, - channel_config: ChannelConfig, - range: NanoRange, - stats_conf: EventChunkerConf, - dbg_path: PathBuf, - expand: bool, - do_decompress: bool, - ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); - ret.state = DataFileState::Event; - ret.need_min = 4; - ret.inp.set_need_min(4); - ret - } - - fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { - span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) - } - - fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { - let mut ret = EventFull::empty(); - let mut parsed_bytes = 0; - use byteorder::{ReadBytesExt, BE}; - loop { - if (buf.len() as u32) < self.need_min { - break; - } - match self.state { - DataFileState::FileHeader => { - if buf.len() < 6 { - Err(Error::with_msg("need min 6 for FileHeader"))?; - } - let mut sl = std::io::Cursor::new(buf.as_ref()); - let fver = sl.read_i16::().unwrap(); - if fver != 0 { - Err(Error::with_msg("unexpected data file version"))?; - } - let len = sl.read_i32::().unwrap(); - if len <= 0 || len >= 128 { - Err(Error::with_msg("large channel header len"))?; - } - let totlen = len as usize + 2; - if buf.len() < totlen { - self.need_min = totlen as u32; - break; - } else { - sl.advance(len as usize - 8); - let len2 = sl.read_i32::().unwrap(); - if len != len2 { - Err(Error::with_msg("channel header len mismatch"))?; - } - String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; - self.state = DataFileState::Event; - self.need_min = 4; - buf.advance(totlen); - parsed_bytes += totlen as u64; - } - } - DataFileState::Event => { - let p0 = 0; - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len = sl.read_i32::().unwrap(); - if len < 20 || len > 1024 * 1024 * 20 { - Err(Error::with_msg("unexpected large event chunk"))?; - } - let len = len as u32; - if (buf.len() as u32) < len { - self.need_min = len as u32; - break; - } else { - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len1b = sl.read_i32::().unwrap(); - assert!(len == len1b as u32); - let _ttl = sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap() as u64; - let pulse = sl.read_i64::().unwrap() as u64; - if ts == self.max_ts { - if self.repeated_ts_warn_count < 20 { - let msg = format!( - "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.repeated_ts_warn_count, - ts / SEC, - ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - self.repeated_ts_warn_count += 1; - } - } - if ts < self.max_ts { - if self.unordered_warn_count < 20 { - let msg = format!( - "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.unordered_warn_count, - ts / SEC, - ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - self.unordered_warn_count += 1; - let e = Error::with_public_msg_no_trace(msg); - return Err(e); - } - } - self.max_ts = ts; - if ts >= self.range.end { - self.seen_after_range_count += 1; - if !self.expand || self.seen_after_range_count >= 2 { - self.seen_beyond_range = true; - self.data_emit_complete = true; - break; - } - } - if ts < self.range.beg { - self.seen_before_range_count += 1; - if self.seen_before_range_count > 1 { - let msg = format!( - "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", - ts / SEC, - ts % SEC, - self.range.beg / SEC, - self.range.beg % SEC, - self.range.end / SEC, - self.range.end % SEC, - pulse, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - let e = Error::with_public_msg(msg); - Err(e)?; - } - } - let _ioc_ts = sl.read_i64::().unwrap(); - let status = sl.read_i8().unwrap(); - let severity = sl.read_i8().unwrap(); - let optional = sl.read_i32::().unwrap(); - if status != 0 { - Err(Error::with_msg(format!("status != 0: {}", status)))?; - } - if severity != 0 { - Err(Error::with_msg(format!("severity != 0: {}", severity)))?; - } - if optional != -1 { - Err(Error::with_msg(format!("optional != -1: {}", optional)))?; - } - let type_flags = sl.read_u8().unwrap(); - let type_index = sl.read_u8().unwrap(); - if type_index > 13 { - Err(Error::with_msg(format!("type_index: {}", type_index)))?; - } - let scalar_type = ScalarType::from_dtype_index(type_index)?; - use super::dtflags::*; - let is_compressed = type_flags & COMPRESSION != 0; - let is_array = type_flags & ARRAY != 0; - let is_big_endian = type_flags & BIG_ENDIAN != 0; - let is_shaped = type_flags & SHAPE != 0; - if let Shape::Wave(_) = self.channel_config.shape { - if !is_array { - Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; - } - } - let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; - let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; - assert!(compression_method <= 0); - assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); - let mut shape_lens = [0, 0, 0, 0]; - for i1 in 0..shape_dim { - shape_lens[i1 as usize] = sl.read_u32::().unwrap(); - } - let shape_this = { - if is_shaped { - if shape_dim == 1 { - Shape::Wave(shape_lens[0]) - } else if shape_dim == 2 { - Shape::Image(shape_lens[0], shape_lens[1]) - } else { - err::todoval() - } - } else { - Shape::Scalar - } - }; - let comp_this = if is_compressed { - if compression_method == 0 { - Some(CompressionMethod::BitshuffleLZ4) - } else { - err::todoval() - } - } else { - None - }; - let p1 = sl.position(); - let k1 = len as u64 - (p1 - p0) - 4; - if is_compressed { - //debug!("event ts {} is_compressed {}", ts, is_compressed); - let value_bytes = sl.read_u64::().unwrap(); - let block_size = sl.read_u32::().unwrap(); - //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - match self.channel_config.shape { - Shape::Scalar => { - assert!(value_bytes < 1024 * 1); - } - Shape::Wave(_) => { - assert!(value_bytes < 1024 * 64); - } - Shape::Image(_, _) => { - assert!(value_bytes < 1024 * 1024 * 20); - } - } - assert!(block_size <= 1024 * 32); - let type_size = scalar_type.bytes() as u32; - let ele_count = value_bytes / type_size as u64; - let ele_size = type_size; - match self.channel_config.shape { - Shape::Scalar => { - if is_array { - Err(Error::with_msg(format!( - "ChannelConfig expects Scalar but we find event is_array" - )))?; - } - } - Shape::Wave(dim1count) => { - if dim1count != ele_count as u32 { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config.shape, ele_count, - )))?; - } - } - Shape::Image(n1, n2) => { - let nt = n1 as usize * n2 as usize; - if nt != ele_count as usize { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config.shape, ele_count, - )))?; - } - } - } - let decomp = { - if self.do_decompress { - let ts1 = Instant::now(); - let decomp_bytes = (type_size * ele_count as u32) as usize; - let mut decomp = BytesMut::with_capacity(decomp_bytes); - unsafe { - decomp.set_len(decomp_bytes); - } - // TODO limit the buf slice range - match bitshuffle_decompress( - &buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)], - &mut decomp, - ele_count as usize, - ele_size as usize, - 0, - ) { - Ok(c1) => { - assert!(c1 as u64 + 12 == k1); - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); - Some(decomp) - } - Err(e) => { - return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; - } - } - } else { - None - } - }; - ret.add_event( - ts, - pulse, - buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), - decomp, - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - shape_this, - comp_this, - ); - } else { - if len < p1 as u32 + 4 { - let msg = format!("uncomp len: {} p1: {}", len, p1); - Err(Error::with_msg(msg))?; - } - let vlen = len - p1 as u32 - 4; - // TODO in this case, decomp and comp is the same and not needed. - let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); - ret.add_event( - ts, - pulse, - buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), - Some(decomp), - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - shape_this, - comp_this, - ); - } - buf.advance(len as usize); - parsed_bytes += len as u64; - self.need_min = 4; - } - } - } - } - Ok(ParseResult { - events: ret, - parsed_bytes, - }) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventFull { - pub tss: Vec, - pub pulses: Vec, - pub blobs: Vec>, - #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] - // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. - pub decomps: Vec>, - pub scalar_types: Vec, - pub be: Vec, - pub shapes: Vec, - pub comps: Vec>, -} - -fn decomps_ser(t: &Vec>, s: S) -> Result -where - S: Serializer, -{ - let a: Vec<_> = t - .iter() - .map(|k| match k { - None => None, - Some(j) => Some(j[..].to_vec()), - }) - .collect(); - Serialize::serialize(&a, s) -} - -fn decomps_de<'de, D>(d: D) -> Result>, D::Error> -where - D: Deserializer<'de>, -{ - let a: Vec>> = Deserialize::deserialize(d)?; - let a = a - .iter() - .map(|k| match k { - None => None, - Some(j) => { - let mut a = BytesMut::new(); - a.extend_from_slice(&j); - Some(a) - } - }) - .collect(); - Ok(a) -} - -impl EventFull { - pub fn empty() -> Self { - Self { - tss: vec![], - pulses: vec![], - blobs: vec![], - decomps: vec![], - scalar_types: vec![], - be: vec![], - shapes: vec![], - comps: vec![], - } - } - - fn add_event( - &mut self, - ts: u64, - pulse: u64, - blob: Vec, - decomp: Option, - scalar_type: ScalarType, - be: bool, - shape: Shape, - comp: Option, - ) { - self.tss.push(ts); - self.pulses.push(pulse); - self.blobs.push(blob); - self.decomps.push(decomp); - self.scalar_types.push(scalar_type); - self.be.push(be); - self.shapes.push(shape); - self.comps.push(comp); - } - - pub fn decomp(&self, i: usize) -> &[u8] { - match &self.decomps[i] { - Some(decomp) => &decomp, - None => &self.blobs[i], - } - } -} - -impl FrameTypeInnerStatic for EventFull { - const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID; -} - -impl WithLen for EventFull { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl Appendable for EventFull { - fn empty_like_self(&self) -> Self { - Self::empty() - } - - // TODO expensive, get rid of it. - fn append(&mut self, src: &Self) { - self.tss.extend_from_slice(&src.tss); - self.pulses.extend_from_slice(&src.pulses); - self.blobs.extend_from_slice(&src.blobs); - self.decomps.extend_from_slice(&src.decomps); - self.scalar_types.extend_from_slice(&src.scalar_types); - self.be.extend_from_slice(&src.be); - self.shapes.extend_from_slice(&src.shapes); - self.comps.extend_from_slice(&src.comps); - } - - fn append_zero(&mut self, _ts1: u64, _ts2: u64) { - // TODO do we still need this type? - todo!() - } -} - -impl Clearable for EventFull { - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.blobs.clear(); - self.decomps.clear(); - self.scalar_types.clear(); - self.be.clear(); - self.shapes.clear(); - self.comps.clear(); - } -} - -impl WithTimestamps for EventFull { - fn ts(&self, ix: usize) -> u64 { - self.tss[ix] - } -} - -impl ByteEstimate for EventFull { - fn byte_estimate(&self) -> u64 { - if self.tss.len() == 0 { - 0 - } else { - // TODO that is clumsy... it assumes homogenous types. - // TODO improve via a const fn on NTY - let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len()); - self.tss.len() as u64 * (40 + self.blobs[0].len() as u64 + decomp_len as u64) - } - } -} - -impl PushableIndex for EventFull { - // TODO check all use cases, can't we move? - fn push_index(&mut self, src: &Self, ix: usize) { - self.tss.push(src.tss[ix]); - self.pulses.push(src.pulses[ix]); - self.blobs.push(src.blobs[ix].clone()); - self.decomps.push(src.decomps[ix].clone()); - self.scalar_types.push(src.scalar_types[ix].clone()); - self.be.push(src.be[ix]); - self.shapes.push(src.shapes[ix].clone()); - self.comps.push(src.comps[ix].clone()); - } -} - -impl Stream for EventChunker { - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("EventChunker poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { - let item = EventDataReadStats { - parsed_bytes: self.parsed_bytes, - }; - self.parsed_bytes = 0; - let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); - Ready(Some(Ok(ret))) - } else if self.sent_beyond_range { - self.completed = true; - Ready(None) - } else if self.final_stats_sent { - self.sent_beyond_range = true; - trace!("sent_beyond_range"); - if self.seen_beyond_range { - trace!("sent_beyond_range RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - trace!("sent_beyond_range non-complete"); - continue 'outer; - } - } else if self.data_emit_complete { - let item = EventDataReadStats { - parsed_bytes: self.parsed_bytes, - }; - self.parsed_bytes = 0; - let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); - self.final_stats_sent = true; - Ready(Some(Ok(ret))) - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(mut fcr))) => { - if false { - // TODO collect for stats: - info!("file read bytes {} ms {}", fcr.buf.len(), fcr.duration.as_millis()); - } - let r = self.parse_buf(&mut fcr.buf); - match r { - Ok(res) => { - self.parsed_bytes += res.parsed_bytes; - if fcr.buf.len() > 0 { - // TODO gather stats about this: - self.inp.put_back(fcr); - } - match self.channel_config.shape { - Shape::Scalar => { - if self.need_min > 1024 * 8 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Wave(_) => { - if self.need_min > 1024 * 32 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Image(_, _) => { - if self.need_min > 1024 * 1024 * 20 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - } - let x = self.need_min; - self.inp.set_need_min(x); - if false { - info!( - "EventChunker emits {} events tss {:?}", - res.events.len(), - res.events.tss - ); - }; - self.item_len_emit_histo.ingest(res.events.len() as u32); - let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); - Ready(Some(Ok(ret))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - } - } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.data_emit_complete = true; - continue 'outer; - } - Pending => Pending, - } - }; - } - } -} - -#[cfg(test)] -mod test { - //use err::Error; - //use netpod::timeunits::*; - //use netpod::{ByteSize, Nanos}; - - /* - #[test] - fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { - let chn = netpod::Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }; - // TODO read config from disk. - let channel_config = ChannelConfig { - channel: chn, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), - shape: netpod::Shape::Scalar, - array: false, - compression: false, - }; - let cluster = taskrun::test_cluster(); - let node = cluster.nodes[nodeix].clone(); - let buffer_size = 512; - let event_chunker_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - } - */ -} diff --git a/disk/src/frame.rs b/disk/src/frame.rs index 8ffca53..5ac629e 100644 --- a/disk/src/frame.rs +++ b/disk/src/frame.rs @@ -1,2 +1 @@ -pub mod inmem; pub mod makeframe; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 74c81ed..665dfff 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -428,7 +428,7 @@ async fn gen_event( buf.put_u8(0); buf.put_u8(0); buf.put_i32(-1); - use crate::dtflags::*; + use streams::dtflags::*; if config.compression { match config.shape { Shape::Wave(ele_count) => { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 2202c76..cd6fe9d 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -299,7 +299,6 @@ where #[cfg(test)] mod test { use crate::dataopen::position_file_for_test; - use crate::eventchunker::{EventChunker, EventChunkerConf}; use crate::file_content_stream; use crate::merge::MergedStream; use err::Error; @@ -310,6 +309,8 @@ mod test { use netpod::timeunits::{DAY, MS}; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; use std::path::PathBuf; + use streams::eventchunker::EventChunker; + use streams::eventchunker::EventChunkerConf; fn scalar_file_path() -> PathBuf { test_data_base_path_databuffer() diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index ba71160..331d032 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -1,9 +1,8 @@ -use crate::eventchunker::EventFull; use crate::merge::MergedStream; -use crate::raw::client::x_processed_event_blobs_stream_from_node; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; +use items::eventfull::EventFull; use items::Sitemty; use netpod::log::*; use netpod::query::RawEventsQuery; @@ -11,6 +10,7 @@ use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use streams::tcprawclient::x_processed_event_blobs_stream_from_node; type T001 = Pin> + Send>>; type T002 = Pin, Error>> + Send>>; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index cd13f61..0fb8fbb 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -1,5 +1,4 @@ use crate::merge::MergedStream; -use crate::raw::client::x_processed_stream_from_node; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; @@ -10,6 +9,7 @@ use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use streams::tcprawclient::x_processed_stream_from_node; type T001 = Pin> + Send>>; type T002 = Pin, Error>> + Send>>; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 62ea3d7..80ac2a1 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -1,3 +1 @@ -pub mod client; pub mod conn; -pub mod eventsfromframes; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3dcf8f5..068ec7e 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -2,9 +2,9 @@ use crate::decode::{BigEndian, Endianness, LittleEndian}; use crate::decode::{EventValueFromBytes, EventValueShape, EventsDecodedStream, NumFromBytes}; use crate::decode::{EventValuesDim0Case, EventValuesDim1Case}; use crate::eventblobs::EventChunkerMultifile; -use crate::eventchunker::{EventChunkerConf, EventFull}; use err::Error; use futures_util::{Stream, StreamExt}; +use items::eventfull::EventFull; use items::numops::{BoolNum, NumOps, StringNum}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; @@ -12,6 +12,7 @@ use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; use std::pin::Pin; +use streams::eventchunker::EventChunkerConf; fn make_num_pipeline_stream_evs( event_value_shape: EVS, diff --git a/dq/Cargo.toml b/dq/Cargo.toml index d9d6bfd..4b58ef5 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -21,3 +21,4 @@ netpod = { path = "../netpod" } items = { path = "../items" } parse = { path = "../parse" } disk = { path = "../disk" } +streams = { path = "../streams" } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index a065df8..966e4b4 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -1,13 +1,9 @@ use clap::Parser; -//use disk::decode::EventValueShape; -//use disk::decode::EventValuesDim0Case; -use disk::eventchunker::EventChunkerConf; use err::Error; use netpod::log::*; -#[allow(unused)] -use netpod::timeunits::*; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Shape}; use std::path::PathBuf; +use streams::eventchunker::{EventChunker, EventChunkerConf}; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -96,7 +92,7 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::mb(2), }; - let chunks = disk::eventchunker::EventChunker::from_start( + let chunks = EventChunker::from_start( inp, channel_config.clone(), range, diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 9d35da3..2ed4431 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -31,6 +31,7 @@ disk = { path = "../disk" } items = { path = "../items" } items_2 = { path = "../items_2" } parse = { path = "../parse" } +streams = { path = "../streams" } nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index bd20d8f..823174e 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -2,11 +2,11 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{response, BodyStream}; use bytes::{BufMut, BytesMut}; -use disk::eventchunker::{EventChunkerConf, EventFull}; use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; +use items::eventfull::EventFull; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; use netpod::query::RawEventsQuery; @@ -14,15 +14,16 @@ use netpod::timeunits::SEC; use netpod::{log::*, DiskIoTune, ReadSys, ACCEPT_ALL}; use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; -use parse::channelconfig::{ - extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry, -}; +use parse::channelconfig::extract_matching_config_entry; +use parse::channelconfig::read_local_config; +use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use streams::eventchunker::EventChunkerConf; use tracing_futures::Instrument; use url::Url; diff --git a/items/Cargo.toml b/items/Cargo.toml index b25bc42..05d0a8e 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -21,3 +21,4 @@ tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", " err = { path = "../err" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } +parse = { path = "../parse" } diff --git a/items/src/eventfull.rs b/items/src/eventfull.rs new file mode 100644 index 0000000..445a65b --- /dev/null +++ b/items/src/eventfull.rs @@ -0,0 +1,175 @@ +use bytes::BytesMut; +use netpod::{ScalarType, Shape}; +use parse::channelconfig::CompressionMethod; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::{Appendable, ByteEstimate, Clearable, FrameTypeInnerStatic, PushableIndex, WithLen, WithTimestamps}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventFull { + pub tss: Vec, + pub pulses: Vec, + pub blobs: Vec>, + #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] + // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. + pub decomps: Vec>, + pub scalar_types: Vec, + pub be: Vec, + pub shapes: Vec, + pub comps: Vec>, +} + +fn decomps_ser(t: &Vec>, s: S) -> Result +where + S: Serializer, +{ + let a: Vec<_> = t + .iter() + .map(|k| match k { + None => None, + Some(j) => Some(j[..].to_vec()), + }) + .collect(); + Serialize::serialize(&a, s) +} + +fn decomps_de<'de, D>(d: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let a: Vec>> = Deserialize::deserialize(d)?; + let a = a + .iter() + .map(|k| match k { + None => None, + Some(j) => { + let mut a = BytesMut::new(); + a.extend_from_slice(&j); + Some(a) + } + }) + .collect(); + Ok(a) +} + +impl EventFull { + pub fn empty() -> Self { + Self { + tss: vec![], + pulses: vec![], + blobs: vec![], + decomps: vec![], + scalar_types: vec![], + be: vec![], + shapes: vec![], + comps: vec![], + } + } + + pub fn add_event( + &mut self, + ts: u64, + pulse: u64, + blob: Vec, + decomp: Option, + scalar_type: ScalarType, + be: bool, + shape: Shape, + comp: Option, + ) { + self.tss.push(ts); + self.pulses.push(pulse); + self.blobs.push(blob); + self.decomps.push(decomp); + self.scalar_types.push(scalar_type); + self.be.push(be); + self.shapes.push(shape); + self.comps.push(comp); + } + + pub fn decomp(&self, i: usize) -> &[u8] { + match &self.decomps[i] { + Some(decomp) => &decomp, + None => &self.blobs[i], + } + } +} + +impl FrameTypeInnerStatic for EventFull { + const FRAME_TYPE_ID: u32 = crate::EVENT_FULL_FRAME_TYPE_ID; +} + +impl WithLen for EventFull { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl Appendable for EventFull { + fn empty_like_self(&self) -> Self { + Self::empty() + } + + // TODO expensive, get rid of it. + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.pulses.extend_from_slice(&src.pulses); + self.blobs.extend_from_slice(&src.blobs); + self.decomps.extend_from_slice(&src.decomps); + self.scalar_types.extend_from_slice(&src.scalar_types); + self.be.extend_from_slice(&src.be); + self.shapes.extend_from_slice(&src.shapes); + self.comps.extend_from_slice(&src.comps); + } + + fn append_zero(&mut self, _ts1: u64, _ts2: u64) { + // TODO do we still need this type? + todo!() + } +} + +impl Clearable for EventFull { + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.blobs.clear(); + self.decomps.clear(); + self.scalar_types.clear(); + self.be.clear(); + self.shapes.clear(); + self.comps.clear(); + } +} + +impl WithTimestamps for EventFull { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl ByteEstimate for EventFull { + fn byte_estimate(&self) -> u64 { + if self.tss.len() == 0 { + 0 + } else { + // TODO that is clumsy... it assumes homogenous types. + // TODO improve via a const fn on NTY + let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len()); + self.tss.len() as u64 * (40 + self.blobs[0].len() as u64 + decomp_len as u64) + } + } +} + +impl PushableIndex for EventFull { + // TODO check all use cases, can't we move? + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.pulses.push(src.pulses[ix]); + self.blobs.push(src.blobs[ix].clone()); + self.decomps.push(src.decomps[ix].clone()); + self.scalar_types.push(src.scalar_types[ix].clone()); + self.be.push(src.be[ix]); + self.shapes.push(src.shapes[ix].clone()); + self.comps.push(src.comps[ix].clone()); + } +} diff --git a/items/src/items.rs b/items/src/items.rs index c732548..3345f87 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -1,6 +1,7 @@ pub mod binnedevents; pub mod binsdim0; pub mod binsdim1; +pub mod eventfull; pub mod eventsitem; pub mod frame; pub mod inmem; diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 8457f56..c3c1742 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -34,3 +34,4 @@ items_2 = { path = "../items_2" } dbconn = { path = "../dbconn" } scyllaconn = { path = "../scyllaconn" } taskrun = { path = "../taskrun" } +streams = { path = "../streams" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 8a28077..215ba9f 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,7 +1,3 @@ -#[cfg(test)] -mod test; - -use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -14,11 +10,15 @@ use netpod::AggKind; use netpod::{NodeConfigCached, PerfOpts}; use std::net::SocketAddr; use std::pin::Pin; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; +#[cfg(test)] +mod test; + pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; diff --git a/streams/Cargo.toml b/streams/Cargo.toml index b997b28..9955072 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] +tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] } tracing = "0.1.26" futures-core = "0.3.15" futures-util = "0.3.15" @@ -13,7 +14,12 @@ serde_json = "1.0" serde_cbor = "0.11.1" bincode = "1.3.3" bytes = "1.0.1" +arrayref = "0.3.6" +crc32fast = "1.3.2" +byteorder = "1.4.3" chrono = { version = "0.4.19", features = ["serde"] } err = { path = "../err" } netpod = { path = "../netpod" } items = { path = "../items" } +parse = { path = "../parse" } +bitshuffle = { path = "../bitshuffle" } diff --git a/streams/src/dtflags.rs b/streams/src/dtflags.rs new file mode 100644 index 0000000..c8ff5fd --- /dev/null +++ b/streams/src/dtflags.rs @@ -0,0 +1,4 @@ +pub const COMPRESSION: u8 = 0x80; +pub const ARRAY: u8 = 0x40; +pub const BIG_ENDIAN: u8 = 0x20; +pub const SHAPE: u8 = 0x10; diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs new file mode 100644 index 0000000..ee5d67c --- /dev/null +++ b/streams/src/eventchunker.rs @@ -0,0 +1,592 @@ +use crate::filechunkread::FileChunkRead; +use crate::needminbuffer::NeedMinBuffer; +use bitshuffle::bitshuffle_decompress; +use bytes::{Buf, BytesMut}; +use err::Error; +use futures_util::{Stream, StreamExt}; +use items::eventfull::EventFull; +use items::{ + RangeCompletableItem, StatsItem, + StreamItem, WithLen, +}; +use netpod::histo::HistoLog2; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use parse::channelconfig::CompressionMethod; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +pub struct EventChunker { + inp: NeedMinBuffer, + state: DataFileState, + need_min: u32, + channel_config: ChannelConfig, + errored: bool, + completed: bool, + range: NanoRange, + stats_conf: EventChunkerConf, + seen_beyond_range: bool, + sent_beyond_range: bool, + data_emit_complete: bool, + final_stats_sent: bool, + parsed_bytes: u64, + dbg_path: PathBuf, + max_ts: u64, + expand: bool, + do_decompress: bool, + decomp_dt_histo: HistoLog2, + item_len_emit_histo: HistoLog2, + seen_before_range_count: usize, + seen_after_range_count: usize, + unordered_warn_count: usize, + repeated_ts_warn_count: usize, +} + +impl Drop for EventChunker { + fn drop(&mut self) { + // TODO collect somewhere + debug!( + "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", + self.decomp_dt_histo, self.item_len_emit_histo + ); + } +} + +enum DataFileState { + FileHeader, + Event, +} + +struct ParseResult { + events: EventFull, + parsed_bytes: u64, +} + +#[derive(Clone, Debug)] +pub struct EventChunkerConf { + pub disk_stats_every: ByteSize, +} + +impl EventChunkerConf { + pub fn new(disk_stats_every: ByteSize) -> Self { + Self { disk_stats_every } + } +} + +impl EventChunker { + // TODO `expand` flag usage + pub fn from_start( + inp: Pin> + Send>>, + channel_config: ChannelConfig, + range: NanoRange, + stats_conf: EventChunkerConf, + dbg_path: PathBuf, + expand: bool, + do_decompress: bool, + ) -> Self { + trace!("EventChunker::from_start"); + let mut inp = NeedMinBuffer::new(inp); + inp.set_need_min(6); + Self { + inp, + state: DataFileState::FileHeader, + need_min: 6, + channel_config, + errored: false, + completed: false, + range, + stats_conf, + seen_beyond_range: false, + sent_beyond_range: false, + data_emit_complete: false, + final_stats_sent: false, + parsed_bytes: 0, + dbg_path, + max_ts: 0, + expand, + do_decompress, + decomp_dt_histo: HistoLog2::new(8), + item_len_emit_histo: HistoLog2::new(0), + seen_before_range_count: 0, + seen_after_range_count: 0, + unordered_warn_count: 0, + repeated_ts_warn_count: 0, + } + } + + // TODO `expand` flag usage + pub fn from_event_boundary( + inp: Pin> + Send>>, + channel_config: ChannelConfig, + range: NanoRange, + stats_conf: EventChunkerConf, + dbg_path: PathBuf, + expand: bool, + do_decompress: bool, + ) -> Self { + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); + ret.state = DataFileState::Event; + ret.need_min = 4; + ret.inp.set_need_min(4); + ret + } + + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) + } + + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + let mut ret = EventFull::empty(); + let mut parsed_bytes = 0; + use byteorder::{ReadBytesExt, BE}; + loop { + if (buf.len() as u32) < self.need_min { + break; + } + match self.state { + DataFileState::FileHeader => { + if buf.len() < 6 { + Err(Error::with_msg("need min 6 for FileHeader"))?; + } + let mut sl = std::io::Cursor::new(buf.as_ref()); + let fver = sl.read_i16::().unwrap(); + if fver != 0 { + Err(Error::with_msg("unexpected data file version"))?; + } + let len = sl.read_i32::().unwrap(); + if len <= 0 || len >= 128 { + Err(Error::with_msg("large channel header len"))?; + } + let totlen = len as usize + 2; + if buf.len() < totlen { + self.need_min = totlen as u32; + break; + } else { + sl.advance(len as usize - 8); + let len2 = sl.read_i32::().unwrap(); + if len != len2 { + Err(Error::with_msg("channel header len mismatch"))?; + } + String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; + self.state = DataFileState::Event; + self.need_min = 4; + buf.advance(totlen); + parsed_bytes += totlen as u64; + } + } + DataFileState::Event => { + let p0 = 0; + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len = sl.read_i32::().unwrap(); + if len < 20 || len > 1024 * 1024 * 20 { + Err(Error::with_msg("unexpected large event chunk"))?; + } + let len = len as u32; + if (buf.len() as u32) < len { + self.need_min = len as u32; + break; + } else { + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len1b = sl.read_i32::().unwrap(); + assert!(len == len1b as u32); + let _ttl = sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap() as u64; + let pulse = sl.read_i64::().unwrap() as u64; + if ts == self.max_ts { + if self.repeated_ts_warn_count < 20 { + let msg = format!( + "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", + self.repeated_ts_warn_count, + ts / SEC, + ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + self.repeated_ts_warn_count += 1; + } + } + if ts < self.max_ts { + if self.unordered_warn_count < 20 { + let msg = format!( + "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", + self.unordered_warn_count, + ts / SEC, + ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + self.unordered_warn_count += 1; + let e = Error::with_public_msg_no_trace(msg); + return Err(e); + } + } + self.max_ts = ts; + if ts >= self.range.end { + self.seen_after_range_count += 1; + if !self.expand || self.seen_after_range_count >= 2 { + self.seen_beyond_range = true; + self.data_emit_complete = true; + break; + } + } + if ts < self.range.beg { + self.seen_before_range_count += 1; + if self.seen_before_range_count > 1 { + let msg = format!( + "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", + ts / SEC, + ts % SEC, + self.range.beg / SEC, + self.range.beg % SEC, + self.range.end / SEC, + self.range.end % SEC, + pulse, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + let e = Error::with_public_msg(msg); + Err(e)?; + } + } + let _ioc_ts = sl.read_i64::().unwrap(); + let status = sl.read_i8().unwrap(); + let severity = sl.read_i8().unwrap(); + let optional = sl.read_i32::().unwrap(); + if status != 0 { + Err(Error::with_msg(format!("status != 0: {}", status)))?; + } + if severity != 0 { + Err(Error::with_msg(format!("severity != 0: {}", severity)))?; + } + if optional != -1 { + Err(Error::with_msg(format!("optional != -1: {}", optional)))?; + } + let type_flags = sl.read_u8().unwrap(); + let type_index = sl.read_u8().unwrap(); + if type_index > 13 { + Err(Error::with_msg(format!("type_index: {}", type_index)))?; + } + let scalar_type = ScalarType::from_dtype_index(type_index)?; + use super::dtflags::*; + let is_compressed = type_flags & COMPRESSION != 0; + let is_array = type_flags & ARRAY != 0; + let is_big_endian = type_flags & BIG_ENDIAN != 0; + let is_shaped = type_flags & SHAPE != 0; + if let Shape::Wave(_) = self.channel_config.shape { + if !is_array { + Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; + } + } + let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; + let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; + assert!(compression_method <= 0); + assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); + let mut shape_lens = [0, 0, 0, 0]; + for i1 in 0..shape_dim { + shape_lens[i1 as usize] = sl.read_u32::().unwrap(); + } + let shape_this = { + if is_shaped { + if shape_dim == 1 { + Shape::Wave(shape_lens[0]) + } else if shape_dim == 2 { + Shape::Image(shape_lens[0], shape_lens[1]) + } else { + err::todoval() + } + } else { + Shape::Scalar + } + }; + let comp_this = if is_compressed { + if compression_method == 0 { + Some(CompressionMethod::BitshuffleLZ4) + } else { + err::todoval() + } + } else { + None + }; + let p1 = sl.position(); + let k1 = len as u64 - (p1 - p0) - 4; + if is_compressed { + //debug!("event ts {} is_compressed {}", ts, is_compressed); + let value_bytes = sl.read_u64::().unwrap(); + let block_size = sl.read_u32::().unwrap(); + //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); + match self.channel_config.shape { + Shape::Scalar => { + assert!(value_bytes < 1024 * 1); + } + Shape::Wave(_) => { + assert!(value_bytes < 1024 * 64); + } + Shape::Image(_, _) => { + assert!(value_bytes < 1024 * 1024 * 20); + } + } + assert!(block_size <= 1024 * 32); + let type_size = scalar_type.bytes() as u32; + let ele_count = value_bytes / type_size as u64; + let ele_size = type_size; + match self.channel_config.shape { + Shape::Scalar => { + if is_array { + Err(Error::with_msg(format!( + "ChannelConfig expects Scalar but we find event is_array" + )))?; + } + } + Shape::Wave(dim1count) => { + if dim1count != ele_count as u32 { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } + Shape::Image(n1, n2) => { + let nt = n1 as usize * n2 as usize; + if nt != ele_count as usize { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } + } + let decomp = { + if self.do_decompress { + let ts1 = Instant::now(); + let decomp_bytes = (type_size * ele_count as u32) as usize; + let mut decomp = BytesMut::with_capacity(decomp_bytes); + unsafe { + decomp.set_len(decomp_bytes); + } + // TODO limit the buf slice range + match bitshuffle_decompress( + &buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)], + &mut decomp, + ele_count as usize, + ele_size as usize, + 0, + ) { + Ok(c1) => { + assert!(c1 as u64 + 12 == k1); + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); + Some(decomp) + } + Err(e) => { + return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; + } + } + } else { + None + } + }; + ret.add_event( + ts, + pulse, + buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), + decomp, + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + shape_this, + comp_this, + ); + } else { + if len < p1 as u32 + 4 { + let msg = format!("uncomp len: {} p1: {}", len, p1); + Err(Error::with_msg(msg))?; + } + let vlen = len - p1 as u32 - 4; + // TODO in this case, decomp and comp is the same and not needed. + let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); + ret.add_event( + ts, + pulse, + buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), + Some(decomp), + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + shape_this, + comp_this, + ); + } + buf.advance(len as usize); + parsed_bytes += len as u64; + self.need_min = 4; + } + } + } + } + Ok(ParseResult { + events: ret, + parsed_bytes, + }) + } +} + +impl Stream for EventChunker { + type Item = Result>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("EventChunker poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); + Ready(Some(Ok(ret))) + } else if self.sent_beyond_range { + self.completed = true; + Ready(None) + } else if self.final_stats_sent { + self.sent_beyond_range = true; + trace!("sent_beyond_range"); + if self.seen_beyond_range { + trace!("sent_beyond_range RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + trace!("sent_beyond_range non-complete"); + continue 'outer; + } + } else if self.data_emit_complete { + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); + self.final_stats_sent = true; + Ready(Some(Ok(ret))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut fcr))) => { + if false { + // TODO collect for stats: + info!( + "file read bytes {} ms {}", + fcr.buf().len(), + fcr.duration().as_millis() + ); + } + let r = self.parse_buf(fcr.buf_mut()); + match r { + Ok(res) => { + self.parsed_bytes += res.parsed_bytes; + if fcr.buf().len() > 0 { + // TODO gather stats about this: + self.inp.put_back(fcr); + } + match self.channel_config.shape { + Shape::Scalar => { + if self.need_min > 1024 * 8 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Wave(_) => { + if self.need_min > 1024 * 32 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Image(_, _) => { + if self.need_min > 1024 * 1024 * 20 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + } + let x = self.need_min; + self.inp.set_need_min(x); + if false { + info!( + "EventChunker emits {} events tss {:?}", + res.events.len(), + res.events.tss + ); + }; + self.item_len_emit_histo.ingest(res.events.len() as u32); + let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); + Ready(Some(Ok(ret))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.data_emit_complete = true; + continue 'outer; + } + Pending => Pending, + } + }; + } + } +} + +#[cfg(test)] +mod test { + //use err::Error; + //use netpod::timeunits::*; + //use netpod::{ByteSize, Nanos}; + + /* + #[test] + fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + // TODO read config from disk. + let channel_config = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + byte_order: netpod::ByteOrder::big_endian(), + shape: netpod::Shape::Scalar, + array: false, + compression: false, + }; + let cluster = taskrun::test_cluster(); + let node = cluster.nodes[nodeix].clone(); + let buffer_size = 512; + let event_chunker_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + } + */ +} diff --git a/streams/src/filechunkread.rs b/streams/src/filechunkread.rs new file mode 100644 index 0000000..2bdc9c7 --- /dev/null +++ b/streams/src/filechunkread.rs @@ -0,0 +1,55 @@ +use bytes::BytesMut; +use std::fmt; +use std::time::Duration; + +pub struct FileChunkRead { + buf: BytesMut, + duration: Duration, +} + +impl FileChunkRead { + pub fn with_buf(buf: BytesMut) -> Self { + Self { + buf, + duration: Duration::from_millis(0), + } + } + + pub fn with_buf_dur(buf: BytesMut, duration: Duration) -> Self { + Self { buf, duration } + } + + pub fn into_buf(self) -> BytesMut { + self.buf + } + + pub fn buf(&self) -> &BytesMut { + &self.buf + } + + pub fn buf_mut(&mut self) -> &mut BytesMut { + &mut self.buf + } + + pub fn buf_take(&mut self) -> BytesMut { + core::mem::replace(&mut self.buf, BytesMut::new()) + } + + pub fn duration(&self) -> &Duration { + &self.duration + } + + pub fn duration_mut(&mut self) -> &mut Duration { + &mut self.duration + } +} + +impl fmt::Debug for FileChunkRead { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FileChunkRead") + .field("buf.len", &self.buf.len()) + .field("buf.cap", &self.buf.capacity()) + .field("duration", &self.duration) + .finish() + } +} diff --git a/streams/src/frames.rs b/streams/src/frames.rs new file mode 100644 index 0000000..dbb234f --- /dev/null +++ b/streams/src/frames.rs @@ -0,0 +1,2 @@ +pub mod eventsfromframes; +pub mod inmem; diff --git a/disk/src/raw/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs similarity index 98% rename from disk/src/raw/eventsfromframes.rs rename to streams/src/frames/eventsfromframes.rs index 99f58cf..b71fb48 100644 --- a/disk/src/raw/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -1,4 +1,4 @@ -use crate::frame::inmem::InMemoryFrameAsyncReadStream; +use super::inmem::InMemoryFrameAsyncReadStream; use futures_core::Stream; use futures_util::StreamExt; use items::frame::decode_frame; diff --git a/disk/src/frame/inmem.rs b/streams/src/frames/inmem.rs similarity index 100% rename from disk/src/frame/inmem.rs rename to streams/src/frames/inmem.rs diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 8896be0..2000417 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -1 +1,7 @@ +pub mod dtflags; +pub mod eventchunker; +pub mod filechunkread; +pub mod frames; +pub mod needminbuffer; pub mod rangefilter; +pub mod tcprawclient; diff --git a/streams/src/needminbuffer.rs b/streams/src/needminbuffer.rs new file mode 100644 index 0000000..ccfea61 --- /dev/null +++ b/streams/src/needminbuffer.rs @@ -0,0 +1,104 @@ +use crate::filechunkread::FileChunkRead; +use err::Error; +use futures_util::{Stream, StreamExt}; +use netpod::histo::HistoLog2; +use netpod::log::*; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct NeedMinBuffer { + inp: Pin> + Send>>, + need_min: u32, + left: Option, + buf_len_histo: HistoLog2, + errored: bool, + completed: bool, +} + +impl NeedMinBuffer { + pub fn new(inp: Pin> + Send>>) -> Self { + Self { + inp: inp, + need_min: 1, + left: None, + buf_len_histo: HistoLog2::new(8), + errored: false, + completed: false, + } + } + + pub fn put_back(&mut self, buf: FileChunkRead) { + assert!(self.left.is_none()); + self.left = Some(buf); + } + + pub fn set_need_min(&mut self, need_min: u32) { + self.need_min = need_min; + } +} + +// TODO collect somewhere else +impl Drop for NeedMinBuffer { + fn drop(&mut self) { + debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo); + } +} + +impl Stream for NeedMinBuffer { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.completed { + panic!("NeedMinBuffer poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut fcr))) => { + self.buf_len_histo.ingest(fcr.buf().len() as u32); + //info!("NeedMinBuffer got buf len {}", fcr.buf.len()); + match self.left.take() { + Some(mut lfcr) => { + // TODO measure: + lfcr.buf_mut().unsplit(fcr.buf_take()); + *lfcr.duration_mut() += *fcr.duration(); + let fcr = lfcr; + if fcr.buf().len() as u32 >= self.need_min { + //info!("with left ready len {} need_min {}", buf.len(), self.need_min); + Ready(Some(Ok(fcr))) + } else { + //info!("with left not enough len {} need_min {}", buf.len(), self.need_min); + self.left.replace(fcr); + continue; + } + } + None => { + if fcr.buf().len() as u32 >= self.need_min { + //info!("simply ready len {} need_min {}", buf.len(), self.need_min); + Ready(Some(Ok(fcr))) + } else { + //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); + self.left.replace(fcr); + continue; + } + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + // TODO collect somewhere + debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo); + Ready(None) + } + Pending => Pending, + } + }; + } + } +} diff --git a/disk/src/raw/client.rs b/streams/src/tcprawclient.rs similarity index 94% rename from disk/src/raw/client.rs rename to streams/src/tcprawclient.rs index 34c18e4..5a59593 100644 --- a/disk/src/raw/client.rs +++ b/streams/src/tcprawclient.rs @@ -5,11 +5,11 @@ Delivers event data (not yet time-binned) from local storage and provides client to request such data from nodes. */ -use crate::eventchunker::EventFull; -use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::raw::eventsfromframes::EventsFromFrames; +use crate::frames::eventsfromframes::EventsFromFrames; +use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_core::Stream; +use items::eventfull::EventFull; use items::frame::{make_frame, make_term_frame}; use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*;