diff --git a/disk/src/agg.rs b/disk/src/agg.rs index d7cf5fa..9c799e2 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -2,7 +2,7 @@ Aggregation and binning support. */ -use crate::EventFull; +use super::eventchunker::EventFull; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 164e72e..982085a 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -42,7 +42,7 @@ async fn agg_x_dim_0_inner() { let bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -107,7 +107,7 @@ async fn agg_x_dim_1_inner() { let bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -165,7 +165,8 @@ async fn merge_0_inner() { .map(|k| make_test_node(k)) .map(|node| { let node = Arc::new(node); - crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream() + super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + .into_dim_1_f32_stream() }) .collect(); MergeDim1F32Stream::new(streams) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs new file mode 100644 index 0000000..403dc3d --- /dev/null +++ b/disk/src/eventblobs.rs @@ -0,0 +1,94 @@ +use crate::eventchunker::{EventChunker, EventFull}; +use crate::{file_content_stream, open_files}; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::{ChannelConfig, Node}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::fs::File; + +pub struct EventBlobsComplete { + channel_config: ChannelConfig, + file_chan: async_channel::Receiver>, + evs: Option, + buffer_size: u32, +} + +impl EventBlobsComplete { + pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc) -> Self { + Self { + file_chan: open_files(query, node), + evs: None, + buffer_size: query.buffer_size, + channel_config, + } + } +} + +impl Stream for EventBlobsComplete { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use Poll::*; + 'outer: loop { + let z = match &mut self.evs { + Some(evs) => match evs.poll_next_unpin(cx) { + Ready(Some(k)) => Ready(Some(k)), + Ready(None) => { + self.evs = None; + continue 'outer; + } + Pending => Pending, + }, + None => match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::new(inp, self.channel_config.clone()); + self.evs.replace(chunker); + continue 'outer; + } + Err(e) => Ready(Some(Err(e))), + }, + Ready(None) => Ready(None), + Pending => Pending, + }, + }; + break z; + } + } +} + +pub fn event_blobs_complete( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { + let query = query.clone(); + let node = node.clone(); + async_stream::stream! { + let filerx = open_files(&query, node.clone()); + while let Ok(fileres) = filerx.recv().await { + match fileres { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); + let mut chunker = EventChunker::new(inp, err::todoval()); + while let Some(evres) = chunker.next().await { + match evres { + Ok(evres) => { + yield Ok(evres); + } + Err(e) => { + yield Err(e) + } + } + } + } + Err(e) => { + yield Err(e); + } + } + } + } +} diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs new file mode 100644 index 0000000..64fdd33 --- /dev/null +++ b/disk/src/eventchunker.rs @@ -0,0 +1,323 @@ +use crate::NeedMinBuffer; +use bitshuffle::bitshuffle_decompress; +use bytes::{Buf, BytesMut}; +use err::Error; +use futures_core::Stream; +use futures_util::pin_mut; +use netpod::{ChannelConfig, ScalarType, Shape}; +use std::pin::Pin; +use std::task::{Context, Poll}; +#[allow(unused_imports)] +use tracing::{debug, error, info, span, trace, warn, Level}; + +pub struct EventChunker { + inp: NeedMinBuffer, + polled: u32, + state: DataFileState, + need_min: u32, + channel_config: ChannelConfig, + errored: bool, + completed: bool, +} + +enum DataFileState { + FileHeader, + Event, +} + +impl EventChunker { + pub fn new( + inp: Pin> + Send>>, + channel_config: ChannelConfig, + ) -> Self { + let mut inp = NeedMinBuffer::new(inp); + inp.set_need_min(6); + Self { + inp: inp, + polled: 0, + state: DataFileState::FileHeader, + need_min: 6, + channel_config, + errored: false, + completed: false, + } + } + + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) + } + + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + // must communicate to caller: + // what I've found in the buffer + // what I've consumed from the buffer + // how many bytes I need min to make progress + let mut ret = EventFull::empty(); + use byteorder::{ReadBytesExt, BE}; + loop { + info!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min); + if (buf.len() as u32) < self.need_min { + break; + } + match self.state { + DataFileState::FileHeader => { + assert!(buf.len() >= 6, "logic"); + let mut sl = std::io::Cursor::new(buf.as_ref()); + let fver = sl.read_i16::().unwrap(); + assert!(fver == 0, "unexpected file version"); + let len = sl.read_i32::().unwrap(); + assert!(len > 0 && len < 128, "unexpected data file header"); + let totlen = len as usize + 2; + if buf.len() < totlen { + info!("parse_buf not enough A totlen {}", totlen); + self.need_min = totlen as u32; + break; + } else { + sl.advance(len as usize - 8); + let len2 = sl.read_i32::().unwrap(); + assert!(len == len2, "len mismatch"); + let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); + info!("channel name {} len {} len2 {}", s1, len, len2); + self.state = DataFileState::Event; + self.need_min = 4; + buf.advance(totlen); + } + } + DataFileState::Event => { + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len = sl.read_i32::().unwrap(); + assert!(len >= 20 && len < 1024 * 1024 * 10); + let len = len as u32; + info!( + "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- event len {}", + len, + ); + if (buf.len() as u32) < 20 { + // TODO gather stats about how often we find not enough input + //info!("parse_buf not enough B"); + self.need_min = len as u32; + break; + } else if (buf.len() as u32) < len { + { + // TODO this is just for testing + let mut sl = std::io::Cursor::new(buf.as_ref()); + sl.read_i32::().unwrap(); + sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap(); + info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); + } + self.need_min = len as u32; + break; + } else { + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len1b = sl.read_i32::().unwrap(); + assert!(len == len1b as u32); + sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap() as u64; + let pulse = sl.read_i64::().unwrap() as u64; + sl.read_i64::().unwrap(); + let status = sl.read_i8().unwrap(); + let severity = sl.read_i8().unwrap(); + let optional = sl.read_i32::().unwrap(); + assert!(status == 0); + assert!(severity == 0); + assert!(optional == -1); + let type_flags = sl.read_u8().unwrap(); + let type_index = sl.read_u8().unwrap(); + assert!(type_index <= 13); + use super::dtflags::*; + let is_compressed = type_flags & COMPRESSION != 0; + let is_array = type_flags & ARRAY != 0; + let _is_big_endian = type_flags & BIG_ENDIAN != 0; + let is_shaped = type_flags & SHAPE != 0; + if let Shape::Wave(_) = self.channel_config.shape { + assert!(is_array); + } + let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; + let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; + assert!(compression_method <= 0); + assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); + let mut shape_lens = [0, 0, 0, 0]; + for i1 in 0..shape_dim { + shape_lens[i1 as usize] = sl.read_u32::().unwrap(); + } + if is_compressed { + //debug!("event ts {} is_compressed {}", ts, is_compressed); + let value_bytes = sl.read_u64::().unwrap(); + let block_size = sl.read_u32::().unwrap(); + let p1 = sl.position() as u32; + let k1 = len as u32 - p1 - 4; + //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); + assert!(value_bytes < 1024 * 256); + assert!(block_size < 1024 * 32); + //let value_bytes = value_bytes; + let type_size = type_size(type_index); + let ele_count = value_bytes / type_size as u64; + let ele_size = type_size; + match self.channel_config.shape { + Shape::Wave(dim1count) => { + if dim1count != ele_count as u32 { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has {:?}", + self.channel_config.shape, ele_count, + )))?; + } + } + Shape::Scalar => { + if is_array { + Err(Error::with_msg(format!( + "ChannelConfig expects Scalar but we find event is_array" + )))?; + } + } + } + let decomp_bytes = (type_size * ele_count as u32) as usize; + let mut decomp = BytesMut::with_capacity(decomp_bytes); + unsafe { + decomp.set_len(decomp_bytes); + } + //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); + match bitshuffle_decompress( + &buf.as_ref()[p1 as usize..], + &mut decomp, + ele_count as usize, + ele_size as usize, + 0, + ) { + Ok(c1) => { + assert!(c1 as u32 == k1); + debug!("decompress result c1 {} k1 {}", c1, k1); + ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); + } + Err(e) => { + Err(Error::with_msg(format!("decompression failed {:?}", e)))?; + } + }; + } else { + Err(Error::with_msg(format!( + "TODO uncompressed event parsing not yet implemented" + )))?; + } + info!("advance and reset need_min"); + buf.advance(len as usize); + self.need_min = 4; + } + } + } + } + info!("AFTER PARSE LOOP len {}", ret.tss.len()); + Ok(ParseResult { events: ret }) + } +} + +fn type_size(ix: u8) -> u32 { + match ix { + 0 => 1, + 1 => 1, + 2 => 1, + 3 => 1, + 4 => 2, + 5 => 2, + 6 => 2, + 7 => 4, + 8 => 4, + 9 => 8, + 10 => 8, + 11 => 4, + 12 => 8, + 13 => 1, + _ => panic!("logic"), + } +} + +struct ParseResult { + events: EventFull, +} + +impl Stream for EventChunker { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("EventChunker poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + self.polled += 1; + if self.polled >= 20000 { + warn!("EventChunker poll limit reached"); + self.errored = true; + return Poll::Ready(Some(Err(Error::with_msg(format!("EventChunker poll limit reached"))))); + } + let g = &mut self.inp; + pin_mut!(g); + info!("EventChunker call input poll_next"); + match g.poll_next(cx) { + Ready(Some(Ok(mut buf))) => { + info!("EventChunker got buffer len {}", buf.len()); + let r = self.parse_buf(&mut buf); + match r { + Ok(res) => { + if buf.len() > 0 { + // TODO gather stats about this: + //info!("parse_buf returned {} leftover bytes to me", buf.len()); + self.inp.put_back(buf); + } + if self.need_min > 1024 * 8 { + let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); + warn!("{}", msg); + self.errored = true; + Ready(Some(Err(Error::with_msg(msg)))) + } else { + let x = self.need_min; + self.inp.set_need_min(x); + Ready(Some(Ok(res.events))) + } + } + Err(e) => { + error!("EventChunker parse_buf returned error {:?}", e); + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } +} + +pub struct EventFull { + pub tss: Vec, + pub pulses: Vec, + pub decomps: Vec>, + pub scalar_types: Vec, +} + +impl EventFull { + pub fn empty() -> Self { + Self { + tss: vec![], + pulses: vec![], + decomps: vec![], + scalar_types: vec![], + } + } + + fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { + self.tss.push(ts); + self.pulses.push(pulse); + self.decomps.push(decomp); + self.scalar_types.push(scalar_type); + } +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d2e32d8..040b882 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,11 +1,10 @@ use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; -use bitshuffle::bitshuffle_decompress; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{pin_mut, select, FutureExt, StreamExt}; -use netpod::{ChannelConfig, Node, ScalarType, Shape}; +use netpod::{ChannelConfig, Node, Shape}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -21,6 +20,8 @@ pub mod agg; pub mod aggtest; pub mod cache; pub mod channelconfig; +pub mod eventblobs; +pub mod eventchunker; pub mod gen; pub mod merge; pub mod raw; @@ -350,7 +351,7 @@ pub fn parsed1( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, err::todoval()); + let mut chunker = eventchunker::EventChunker::new(inp, err::todoval()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { @@ -377,403 +378,6 @@ pub fn parsed1( } } } - -pub struct EventBlobsComplete { - channel_config: ChannelConfig, - file_chan: async_channel::Receiver>, - evs: Option, - buffer_size: u32, -} - -impl EventBlobsComplete { - pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc) -> Self { - Self { - file_chan: open_files(query, node), - evs: None, - buffer_size: query.buffer_size, - channel_config, - } - } -} - -impl Stream for EventBlobsComplete { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use Poll::*; - 'outer: loop { - let z = match &mut self.evs { - Some(evs) => match evs.poll_next_unpin(cx) { - Ready(Some(k)) => Ready(Some(k)), - Ready(None) => { - self.evs = None; - continue 'outer; - } - Pending => Pending, - }, - None => match self.file_chan.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::new(inp, self.channel_config.clone()); - self.evs.replace(chunker); - continue 'outer; - } - Err(e) => Ready(Some(Err(e))), - }, - Ready(None) => Ready(None), - Pending => Pending, - }, - }; - break z; - } - } -} - -pub fn event_blobs_complete( - query: &netpod::AggQuerySingleChannel, - node: Arc, -) -> impl Stream> + Send { - let query = query.clone(); - let node = node.clone(); - async_stream::stream! { - let filerx = open_files(&query, node.clone()); - while let Ok(fileres) = filerx.recv().await { - match fileres { - Ok(file) => { - let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, err::todoval()); - while let Some(evres) = chunker.next().await { - match evres { - Ok(evres) => { - yield Ok(evres); - } - Err(e) => { - yield Err(e) - } - } - } - } - Err(e) => { - yield Err(e); - } - } - } - } -} - -pub struct EventChunker { - inp: NeedMinBuffer, - polled: u32, - state: DataFileState, - need_min: u32, - channel_config: ChannelConfig, - errored: bool, - completed: bool, -} - -enum DataFileState { - FileHeader, - Event, -} - -impl EventChunker { - pub fn new( - inp: Pin> + Send>>, - channel_config: ChannelConfig, - ) -> Self { - let mut inp = NeedMinBuffer::new(inp); - inp.set_need_min(6); - Self { - inp: inp, - polled: 0, - state: DataFileState::FileHeader, - need_min: 6, - channel_config, - errored: false, - completed: false, - } - } - - fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { - span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) - } - - fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { - // must communicate to caller: - // what I've found in the buffer - // what I've consumed from the buffer - // how many bytes I need min to make progress - let mut ret = EventFull::empty(); - use byteorder::{ReadBytesExt, BE}; - loop { - info!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min); - if (buf.len() as u32) < self.need_min { - break; - } - match self.state { - DataFileState::FileHeader => { - assert!(buf.len() >= 6, "logic"); - let mut sl = std::io::Cursor::new(buf.as_ref()); - let fver = sl.read_i16::().unwrap(); - assert!(fver == 0, "unexpected file version"); - let len = sl.read_i32::().unwrap(); - assert!(len > 0 && len < 128, "unexpected data file header"); - let totlen = len as usize + 2; - if buf.len() < totlen { - info!("parse_buf not enough A totlen {}", totlen); - self.need_min = totlen as u32; - break; - } else { - sl.advance(len as usize - 8); - let len2 = sl.read_i32::().unwrap(); - assert!(len == len2, "len mismatch"); - let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); - info!("channel name {} len {} len2 {}", s1, len, len2); - self.state = DataFileState::Event; - self.need_min = 4; - buf.advance(totlen); - } - } - DataFileState::Event => { - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len = sl.read_i32::().unwrap(); - assert!(len >= 20 && len < 1024 * 1024 * 10); - let len = len as u32; - info!( - "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- event len {}", - len, - ); - if (buf.len() as u32) < 20 { - // TODO gather stats about how often we find not enough input - //info!("parse_buf not enough B"); - self.need_min = len as u32; - break; - } else if (buf.len() as u32) < len { - { - // TODO this is just for testing - let mut sl = std::io::Cursor::new(buf.as_ref()); - sl.read_i32::().unwrap(); - sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap(); - info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); - } - self.need_min = len as u32; - break; - } else { - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len1b = sl.read_i32::().unwrap(); - assert!(len == len1b as u32); - sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap() as u64; - let pulse = sl.read_i64::().unwrap() as u64; - sl.read_i64::().unwrap(); - let status = sl.read_i8().unwrap(); - let severity = sl.read_i8().unwrap(); - let optional = sl.read_i32::().unwrap(); - assert!(status == 0); - assert!(severity == 0); - assert!(optional == -1); - let type_flags = sl.read_u8().unwrap(); - let type_index = sl.read_u8().unwrap(); - assert!(type_index <= 13); - use dtflags::*; - let is_compressed = type_flags & COMPRESSION != 0; - let is_array = type_flags & ARRAY != 0; - let _is_big_endian = type_flags & BIG_ENDIAN != 0; - let is_shaped = type_flags & SHAPE != 0; - if let Shape::Wave(_) = self.channel_config.shape { - assert!(is_array); - } - let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; - let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; - assert!(compression_method <= 0); - assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); - let mut shape_lens = [0, 0, 0, 0]; - for i1 in 0..shape_dim { - shape_lens[i1 as usize] = sl.read_u32::().unwrap(); - } - if is_compressed { - //debug!("event ts {} is_compressed {}", ts, is_compressed); - let value_bytes = sl.read_u64::().unwrap(); - let block_size = sl.read_u32::().unwrap(); - let p1 = sl.position() as u32; - let k1 = len as u32 - p1 - 4; - //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - assert!(value_bytes < 1024 * 256); - assert!(block_size < 1024 * 32); - //let value_bytes = value_bytes; - let type_size = type_size(type_index); - let ele_count = value_bytes / type_size as u64; - let ele_size = type_size; - match self.channel_config.shape { - Shape::Wave(dim1count) => { - if dim1count != ele_count as u32 { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has {:?}", - self.channel_config.shape, ele_count, - )))?; - } - } - Shape::Scalar => { - if is_array { - Err(Error::with_msg(format!( - "ChannelConfig expects Scalar but we find event is_array" - )))?; - } - } - } - let decomp_bytes = (type_size * ele_count as u32) as usize; - let mut decomp = BytesMut::with_capacity(decomp_bytes); - unsafe { - decomp.set_len(decomp_bytes); - } - //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); - match bitshuffle_decompress( - &buf.as_ref()[p1 as usize..], - &mut decomp, - ele_count as usize, - ele_size as usize, - 0, - ) { - Ok(c1) => { - assert!(c1 as u32 == k1); - debug!("decompress result c1 {} k1 {}", c1, k1); - ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); - } - Err(e) => { - Err(Error::with_msg(format!("decompression failed {:?}", e)))?; - } - }; - } else { - Err(Error::with_msg(format!( - "TODO uncompressed event parsing not yet implemented" - )))?; - } - info!("advance and reset need_min"); - buf.advance(len as usize); - self.need_min = 4; - } - } - } - } - info!("AFTER PARSE LOOP len {}", ret.tss.len()); - Ok(ParseResult { events: ret }) - } -} - -fn type_size(ix: u8) -> u32 { - match ix { - 0 => 1, - 1 => 1, - 2 => 1, - 3 => 1, - 4 => 2, - 5 => 2, - 6 => 2, - 7 => 4, - 8 => 4, - 9 => 8, - 10 => 8, - 11 => 4, - 12 => 8, - 13 => 1, - _ => panic!("logic"), - } -} - -struct ParseResult { - events: EventFull, -} - -impl Stream for EventChunker { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use Poll::*; - if self.completed { - panic!("EventChunker poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - self.polled += 1; - if self.polled >= 20000 { - warn!("EventChunker poll limit reached"); - self.errored = true; - return Poll::Ready(Some(Err(Error::with_msg(format!("EventChunker poll limit reached"))))); - } - let g = &mut self.inp; - pin_mut!(g); - info!("EventChunker call input poll_next"); - match g.poll_next(cx) { - Ready(Some(Ok(mut buf))) => { - info!("EventChunker got buffer len {}", buf.len()); - let r = self.parse_buf(&mut buf); - match r { - Ok(res) => { - if buf.len() > 0 { - // TODO gather stats about this: - //info!("parse_buf returned {} leftover bytes to me", buf.len()); - self.inp.put_back(buf); - } - if self.need_min > 1024 * 8 { - let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); - warn!("{}", msg); - self.errored = true; - Ready(Some(Err(Error::with_msg(msg)))) - } else { - let x = self.need_min; - self.inp.set_need_min(x); - Ready(Some(Ok(res.events))) - } - } - Err(e) => { - error!("EventChunker parse_buf returned error {:?}", e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - } - } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } -} - -pub struct EventFull { - tss: Vec, - pulses: Vec, - decomps: Vec>, - scalar_types: Vec, -} - -impl EventFull { - pub fn empty() -> Self { - Self { - tss: vec![], - pulses: vec![], - decomps: vec![], - scalar_types: vec![], - } - } - - fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { - self.tss.push(ts); - self.pulses.push(pulse); - self.decomps.push(decomp); - self.scalar_types.push(scalar_type); - } -} - pub struct NeedMinBuffer { inp: Pin> + Send>>, need_min: u32, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index d3b0074..f889f9a 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,5 +1,5 @@ use crate::agg::{Dim1F32Stream, MinMaxAvgScalarEventBatch, ValuesDim1}; -use crate::EventFull; +use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index f6865cc..cd6a952 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -173,9 +173,9 @@ where assert!(bnew.capacity() >= self.wp); info!( "InMemoryFrameAsyncReadStream re-use {} bytes from previous i/o", - self.wp + self.wp, ); - bnew.put(&self.buf[..self.wp]); + bnew[0..].as_mut().put(&self.buf[..self.wp]); self.buf = bnew; } info!( @@ -273,7 +273,12 @@ where let nl = len as usize + HEAD; if self.bufcap < nl { // TODO count cases in production - self.bufcap += 2 * nl; + let n = 2 * nl; + warn!( + "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee ADJUST bufcap old {} new {}", + self.bufcap, n + ); + self.bufcap = n; } if nb >= nl { use bytes::Buf; @@ -460,9 +465,16 @@ pub async fn raw_service(node_config: Arc) -> Result<(), Error> { async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Arc) -> Result<(), Error> { //use tracing_futures::Instrument; let span1 = span!(Level::INFO, "raw::raw_conn_handler"); - raw_conn_handler_inner(stream, addr, node_config) + let r = raw_conn_handler_inner(stream, addr, node_config) .instrument(span1) - .await + .await; + match r { + Ok(k) => Ok(k), + Err(e) => { + error!("raw_conn_handler sees error: {:?}", e); + Err(e) + } + } } type RawConnOut = Result; @@ -475,7 +487,10 @@ async fn raw_conn_handler_inner( match raw_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(mut ce) => { - error!("raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP"); + /*error!( + "raw_conn_handler_inner CAUGHT ERROR AND TRY TO SEND OVER TCP {:?}", + ce.err + );*/ let buf = make_frame::(&Err(ce.err))?; match ce.netout.write(&buf).await { Ok(_) => (), @@ -553,7 +568,7 @@ async fn raw_conn_handler_inner_try( }, keyspace: 3, time_bin_size: DAY, - shape: Shape::Wave(1024), + shape: Shape::Wave(17), scalar_type: ScalarType::F64, big_endian: true, array: true, @@ -565,10 +580,11 @@ async fn raw_conn_handler_inner_try( // TODO use the requested buffer size buffer_size: 1024 * 4, }; - let mut s1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone()) - .into_dim_1_f32_stream() - .take(10) - .into_binned_x_bins_1(); + let mut s1 = + super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), node_config.node.clone()) + .into_dim_1_f32_stream() + .take(10) + .into_binned_x_bins_1(); while let Some(item) = s1.next().await { if let Ok(k) = &item { info!("???????????????? emit item ts0: {:?}", k.tss.first()); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 5a6473c..7c69a5d 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,5 +1,5 @@ use crate::spawn_test_hosts; -use bytes::{BufMut, BytesMut}; +use bytes::BytesMut; use chrono::Utc; use err::Error; use futures_util::TryStreamExt; diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 6a1a198..e0cf2e6 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -56,7 +56,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,retrieval=trace,retrieval::test=trace,disk=trace,tokio_postgres=info", + "info,retrieval=trace,retrieval::test=trace,disk=info,tokio_postgres=info", )) .init(); }