diff --git a/disk/src/agg.rs b/disk/src/agg.rs index c56a022..3bdcdc5 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -216,11 +216,74 @@ impl MinMaxAvgScalarEventBatch { avgs: vec![], } } + pub fn from_full_frame(buf: &Bytes) -> Self { + info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); + let mut g = MinMaxAvgScalarEventBatch::empty(); + + let n1; + unsafe { + let ptr = (&buf[0] as *const u8) as *const [u8; 4]; + n1 = u32::from_le_bytes(*ptr); + trace!("--- +++ --- +++ --- +++ n1: {}", n1); + } + if n1 == 0 { + g + } else { + let n2 = n1 as usize; + g.tss.reserve(n2); + g.mins.reserve(n2); + g.maxs.reserve(n2); + g.avgs.reserve(n2); + unsafe { + // TODO Can I unsafely create ptrs and just assign them? + // TODO What are cases where I really need transmute? + g.tss.set_len(n2); + g.mins.set_len(n2); + g.maxs.set_len(n2); + g.avgs.set_len(n2); + let ptr0 = &buf[4] as *const u8; + { + let ptr1 = ptr0 as *const u64; + for i1 in 0..n2 { + g.tss[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8) * n2) as *const f32; + for i1 in 0..n2 { + g.mins[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.maxs[i1] = *ptr1; + } + } + { + let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.avgs[i1] = *ptr1; + } + } + } + info!("CONTENT {:?}", g); + g + } + } } impl std::fmt::Debug for MinMaxAvgScalarEventBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "MinMaxAvgScalarEventBatch count {}", self.tss.len()) + write!( + fmt, + "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}", + self.tss.len(), + self.tss, + self.mins, + self.maxs, + self.avgs, + ) } } @@ -242,18 +305,24 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { impl Frameable for MinMaxAvgScalarEventBatch { fn serialized(&self) -> Bytes { - assert!(self.tss.len() != 0); let n1 = self.tss.len(); let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); g.put_u32_le(n1 as u32); - let a = unsafe { std::slice::from_raw_parts(&self.tss[0] as *const u64 as *const u8, size_of::() * n1) }; - g.put(a); - let a = unsafe { std::slice::from_raw_parts(&self.mins[0] as *const f32 as *const u8, size_of::() * n1) }; - g.put(a); - let a = unsafe { std::slice::from_raw_parts(&self.maxs[0] as *const f32 as *const u8, size_of::() * n1) }; - g.put(a); - let a = unsafe { std::slice::from_raw_parts(&self.avgs[0] as *const f32 as *const u8, size_of::() * n1) }; - g.put(a); + if n1 > 0 { + let ptr = &self.tss[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.mins[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.maxs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.avgs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + } + info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len()); g.freeze() } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 731f5b9..153eda9 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -69,12 +69,10 @@ pub async fn binned_bytes_for_http( let e2 = &channel_config.entries[i1 + 1]; if e1.ts < query.range.end && e2.ts >= query.range.beg { ixs.push(i1); - } else { } } else { if e1.ts < query.range.end { ixs.push(i1); - } else { } } } @@ -109,7 +107,7 @@ pub async fn binned_bytes_for_http( } None => { // Merge raw data - error!("TODO merge raw data"); + error!("binned_bytes_for_http TODO merge raw data"); todo!() } } @@ -128,8 +126,9 @@ impl Stream for BinnedBytesForHttpStream { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(_k))) => { + error!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! BinnedBytesForHttpStream TODO serialize to bytes"); let mut buf = BytesMut::with_capacity(250); - buf.put(&b"TODO serialize to bytes\n"[..]); + buf.put(&b"BinnedBytesForHttpStream TODO serialize to bytes\n"[..]); Ready(Some(Ok(buf.freeze()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), @@ -287,8 +286,10 @@ impl PreBinnedValueStream { }; let evq = Arc::new(evq); let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); - error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); - let s2 = s1.map_ok(|_k| MinMaxAvgScalarBinBatch::empty()); + let s2 = s1.map_ok(|_k| { + error!("try_setup_fetch_prebinned_higher_res TODO emit actual value"); + MinMaxAvgScalarBinBatch::empty() + }); self.fut2 = Some(Box::pin(s2)); } } @@ -453,7 +454,7 @@ impl Stream for MergedFromRemotes { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - trace!("PreBinnedAssembledFromRemotes MAIN POLL"); + trace!("MergedFromRemotes MAIN POLL"); use Poll::*; // TODO this has several stages: // First, establish async all connections. @@ -462,9 +463,18 @@ impl Stream for MergedFromRemotes { break if let Some(fut) = &mut self.merged { debug!("MergedFromRemotes POLL merged"); match fut.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => Ready(Some(Ok(k))), - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), + Ready(Some(Ok(k))) => { + info!("MergedFromRemotes Ready Some Ok"); + Ready(Some(Ok(k))) + } + Ready(Some(Err(e))) => { + info!("MergedFromRemotes Ready Some Err"); + Ready(Some(Err(e))) + } + Ready(None) => { + info!("MergedFromRemotes Ready None"); + Ready(None) + } Pending => Pending, } } else { @@ -474,10 +484,10 @@ impl Stream for MergedFromRemotes { if self.nodein[i1].is_none() { let f = &mut self.tcp_establish_futs[i1]; pin_mut!(f); - info!("tcp_establish_futs POLLING INPUT ESTAB {}", i1); + info!("MergedFromRemotes tcp_establish_futs POLLING INPUT ESTAB {}", i1); match f.poll(cx) { Ready(Ok(k)) => { - info!("tcp_establish_futs ESTABLISHED INPUT {}", i1); + info!("MergedFromRemotes tcp_establish_futs ESTABLISHED INPUT {}", i1); self.nodein[i1] = Some(k); } Ready(Err(e)) => return Ready(Some(Err(e))), @@ -493,14 +503,14 @@ impl Stream for MergedFromRemotes { Pending } else { if c1 == self.tcp_establish_futs.len() { - debug!("SETTING UP MERGED STREAM"); + debug!("MergedFromRemotes SETTING UP MERGED STREAM"); // TODO set up the merged stream let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); let s1 = MergedMinMaxAvgScalarStream::new(inps); self.merged = Some(Box::pin(s1)); } else { - error!( - "NOTHING PENDING TODO WHAT TO DO? {} {}", + info!( + "MergedFromRemotes conn / estab {} {}", c1, self.tcp_establish_futs.len() ); diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 3de4307..5cce13a 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -48,39 +48,43 @@ pub async fn x_processed_stream_from_node( let (netin, netout) = net.into_split(); netout.forget(); debug!("x_processed_stream_from_node WRITTEN"); - let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: netin }; + let frames = InMemoryFrameAsyncReadStream::new(netin); + let s2 = MinMaxAvgScalarEventBatchStreamFromFrames { inp: frames }; debug!("x_processed_stream_from_node HAVE STREAM INSTANCE"); let s3: Pin> + Send>> = Box::pin(s2); debug!("x_processed_stream_from_node RETURN"); Ok(s3) } -pub struct MinMaxAvgScalarEventBatchStreamFromTcp { - inp: OwnedReadHalf, +pub struct MinMaxAvgScalarEventBatchStreamFromFrames +where + T: AsyncRead + Unpin, +{ + inp: InMemoryFrameAsyncReadStream, } -impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp { +impl Stream for MinMaxAvgScalarEventBatchStreamFromFrames +where + T: AsyncRead + Unpin, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - // TODO make capacity configurable. - // TODO reuse buffer if not full. - let mut buf = BytesMut::with_capacity(1024 * 2); - let mut buf2 = ReadBuf::new(buf.as_mut()); let j = &mut self.inp; pin_mut!(j); - break match AsyncRead::poll_read(j, cx, &mut buf2) { - Ready(Ok(_)) => { - if buf.len() == 0 { - Ready(None) - } else { - error!("got input from remote {} bytes", buf.len()); - Ready(Some(Ok(err::todoval()))) - } + break match j.poll_next(cx) { + Ready(Some(Ok(buf))) => { + info!( + "MinMaxAvgScalarEventBatchStreamFromFrames got full frame buf {}", + buf.len() + ); + let item = MinMaxAvgScalarEventBatch::from_full_frame(&buf); + Ready(Some(Ok(item))) } - Ready(Err(e)) => Ready(Some(Err(e.into()))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), Pending => Pending, }; } @@ -92,13 +96,19 @@ Interprets a byte stream as length-delimited frames. Emits each frame as a single item. Therefore, each item must fit easily into memory. */ -pub struct InMemoryFrameAsyncReadStream { +pub struct InMemoryFrameAsyncReadStream +where + T: AsyncRead + Unpin, +{ inp: T, buf: BytesMut, wp: usize, } -impl InMemoryFrameAsyncReadStream { +impl InMemoryFrameAsyncReadStream +where + T: AsyncRead + Unpin, +{ fn new(inp: T) -> Self { // TODO make start cap adjustable let mut buf = BytesMut::with_capacity(1024); @@ -232,7 +242,16 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr) -> Result<(), Err warn!("TODO decide on response content based on the parsed json query"); warn!("raw_conn_handler INPUT STREAM END"); - let mut s1 = futures_util::stream::iter(vec![MinMaxAvgScalarEventBatch::empty()]); + let mut batch = MinMaxAvgScalarEventBatch::empty(); + batch.tss.push(42); + batch.tss.push(43); + batch.mins.push(7.1); + batch.mins.push(7.2); + batch.maxs.push(8.3); + batch.maxs.push(8.4); + batch.avgs.push(9.5); + batch.avgs.push(9.6); + let mut s1 = futures_util::stream::iter(vec![batch]); while let Some(item) = s1.next().await { let fr = item.serialized(); netout.write_u32_le(fr.len() as u32).await?; diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 506f15b..c3bbdf5 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -66,7 +66,7 @@ async fn get_cached_0_inner() -> Result<(), Error> { loop { match res_body.data().await { Some(Ok(k)) => { - //info!("packet.. len {}", k.len()); + info!("packet.. len {}", k.len()); ntot += k.len() as u64; } Some(Err(e)) => {