diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 31c7d20..881da06 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,3 +1,7 @@ +/*! +Aggregation and binning support. +*/ + use crate::EventFull; use err::Error; use futures_core::Stream; @@ -30,13 +34,14 @@ pub trait AggregatableTdim { fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; } -// dummy +/// DO NOT USE. This is just a dummy for some testing. impl AggregatableXdim1Bin for () { type Output = (); fn into_agg(self) -> Self::Output { todo!() } } +/// DO NOT USE. This is just a dummy for some testing. impl AggregatableTdim for () { type Output = (); type Aggregator = (); @@ -44,6 +49,7 @@ impl AggregatableTdim for () { todo!() } } +/// DO NOT USE. This is just a dummy for some testing. impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); @@ -68,6 +74,7 @@ impl AggregatorTdim for () { } } +/// Batch of events with a scalar (zero dimensions) numeric value. pub struct ValuesDim0 { tss: Vec, values: Vec>, @@ -124,6 +131,7 @@ impl AggregatableXdim1Bin for ValuesDim1 { } } +/// Batch of events with a numeric one-dimensional (i.e. array) value. pub struct ValuesDim1 { pub tss: Vec, pub values: Vec>, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 2f5e25b..7dbadd1 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,5 @@ use crate::agg::MinMaxAvgScalarBinBatch; +use crate::raw::EventsQuery; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use err::Error; @@ -283,8 +284,18 @@ impl PreBinnedValueStream { // set up merger // set up T-binning // save to cache file if input is complete - - todo!(); + let evq = EventsQuery { + channel: self.channel.clone(), + range: NanoRange { + beg: self.patch_coord.patch_beg(), + end: self.patch_coord.patch_end(), + }, + agg_kind: self.agg_kind.clone(), + }; + self.fut2 = Some(Box::pin(PreBinnedAssembledFromRemotes::new( + evq, + &self.node_config.cluster, + ))); } } } @@ -415,6 +426,27 @@ impl Stream for PreBinnedValueFetchedStream { } } +pub struct PreBinnedAssembledFromRemotes {} + +impl PreBinnedAssembledFromRemotes { + pub fn new(evq: EventsQuery, cluster: &Cluster) -> Self { + err::todoval() + } +} + +impl Stream for PreBinnedAssembledFromRemotes { + // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + // TODO this has several stages: + // First, establish async all connections. + // Then assemble the merge-and-processing-pipeline and pull from there. + err::todoval() + } +} + pub struct BinnedStream { inp: Pin> + Send>>, } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index fa7ae4d..9aeaede 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -1,75 +1,172 @@ -/* -Provide ser/de of value data to a good net exchange format. +/*! +Delivers event data. + +Delivers event data (not yet time-binned) from local storage and provides client functions +to request such data from nodes. */ -use crate::agg::MinMaxAvgScalarBinBatch; +use crate::agg::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; +use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; -use netpod::Node; +use futures_util::pin_mut; +use netpod::{AggKind, Channel, NanoRange, Node, NodeConfig}; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; +use tokio::net::TcpStream; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; + +/** +Query parameters to request (optionally) X-processed, but not T-processed events. +*/ +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsQuery { + pub channel: Channel, + pub range: NanoRange, + pub agg_kind: AggKind, +} pub async fn x_processed_stream_from_node( + query: &EventsQuery, node: &Node, -) -> Result>>>, Error> { - // TODO can I factor this better? - // Need a stream of bytes, and a deserializer from stream of bytes to stream of items. - // Need to pass the parameters to upstream. - - let netin = tokio::net::TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - - // TODO TcpStream is not yet a Stream! - - //let s2: Pin>>> = Box::pin(netin); - - err::todoval() +) -> Result>>>, Error> { + let mut net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_vec(query)?; + net.write_u32_le(qjs.len() as u32).await?; + net.write_all(&qjs).await?; + net.flush().await?; + let s2 = MinMaxAvgScalarEventBatchStreamFromTcp { inp: net }; + let s3: Pin>>> = Box::pin(s2); + Ok(s3) } -pub struct MinMaxAvgScalarBinBatchStreamFromByteStream { - //inp: TcpStream, +pub struct MinMaxAvgScalarEventBatchStreamFromTcp { + inp: TcpStream, } -impl Stream for MinMaxAvgScalarBinBatchStreamFromByteStream { - type Item = Result; +impl Stream for MinMaxAvgScalarEventBatchStreamFromTcp { + type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - err::todoval() + 'outer: loop { + // TODO make capacity configurable. + // TODO reuse buffer if not full. + let mut buf = BytesMut::with_capacity(1024 * 2); + let mut buf2 = ReadBuf::new(buf.as_mut()); + let j = &mut self.inp; + pin_mut!(j); + break match AsyncRead::poll_read(j, cx, &mut buf2) { + Ready(Ok(_)) => { + if buf.len() == 0 { + Ready(None) + } else { + error!("got input from remote {} bytes", buf.len()); + Ready(Some(Ok(err::todoval()))) + } + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + }; + } } } +/** +Interprets a byte stream as length-delimited frames. + +Emits each frame as a single item. Therefore, each item must fit easily into memory. +*/ +pub struct InMemoryFrameAsyncReadStream { + inp: T, + buf: Option, +} + +impl InMemoryFrameAsyncReadStream { + fn new(inp: T) -> Self { + Self { + inp, + // TODO make start cap adjustable + buf: Some(BytesMut::with_capacity(1024)), + } + } +} + +impl Stream for InMemoryFrameAsyncReadStream +where + T: AsyncRead + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + let mut buf0 = self.buf.take().unwrap(); + let mut buf2 = ReadBuf::new(buf0.as_mut()); + assert!(buf2.capacity() > 0); + assert!(buf2.remaining() > 0); + let r1 = buf2.remaining(); + let j = &mut self.inp; + pin_mut!(j); + break match AsyncRead::poll_read(j, cx, &mut buf2) { + Ready(Ok(())) => { + if buf2.remaining() == r1 { + // TODO re-init self.buf ? + // TODO end of input. + err::todoval() + } else { + // TODO re-init self.buf ? + // TODO how to reflect the write position in the underlying BytesMut??? + err::todoval() + } + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + }; + } + } +} + +// TODO build a stream from disk data to batched event data. #[allow(dead_code)] async fn local_unpacked_test() { - // TODO what kind of query format? What information do I need here? - // Don't need exact details of channel because I need to parse the databuffer config anyway. - - /*let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { - channel: Channel { - backend: "ks".into(), - name: "wave1".into(), - }, - keyspace: 3, - time_bin_size: DAY, - shape: Shape::Wave(17), - scalar_type: ScalarType::F64, - big_endian: true, - compression: true, - }, - timebin: 0, - tb_file_count: 1, - buffer_size: 1024 * 8, - };*/ - let query = err::todoval(); let node = err::todoval(); - - // TODO generate channel configs for my test data. - // TODO open and parse the channel config. - // TODO find the matching config entry. (bonus: fuse consecutive compatible entries) - use crate::agg::IntoDim1F32Stream; let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); } + +/** +Can be serialized as a length-delimited frame. +*/ +pub trait Frameable {} + +pub async fn raw_service(node_config: Arc) -> Result<(), Error> { + let addr = format!("0.0.0.0:{}", node_config.node.port_raw); + let lis = tokio::net::TcpListener::bind(addr).await?; + loop { + match lis.accept().await { + Ok((stream, addr)) => { + taskrun::spawn(raw_conn_handler(stream, addr)); + } + Err(e) => Err(e)?, + } + } +} + +async fn raw_conn_handler(mut stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { + info!("RAW HANDLER for {:?}", addr); + let (netin, mut netout) = stream.into_split(); + InMemoryFrameAsyncReadStream::new(netin); + netout.write_i32_le(123).await?; + netout.flush().await?; + netout.forget(); + Ok(()) +} diff --git a/err/src/lib.rs b/err/src/lib.rs index e76b291..6bf24aa 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -1,9 +1,16 @@ +/*! +Error handling and reporting. +*/ + use nom::error::ErrorKind; use std::fmt::Debug; use std::num::ParseIntError; use std::string::FromUtf8Error; use tokio::task::JoinError; +/** +The common error type for this application. +*/ pub struct Error { msg: String, trace: backtrace::Backtrace, @@ -20,13 +27,34 @@ impl Error { impl std::fmt::Debug for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) + use std::io::Write; + let mut buf = vec![]; + for fr in self.trace.frames() { + for sy in fr.symbols() { + let is_ours = match sy.filename() { + None => false, + Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), + }; + if is_ours { + write!( + &mut buf, + "\n {}\n {} {}", + sy.name().unwrap(), + sy.filename().unwrap().to_str().unwrap(), + sy.lineno().unwrap(), + ) + .unwrap(); + } + } + } + //write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) + write!(fmt, "Error {} trace{}", self.msg, String::from_utf8(buf).unwrap()) } } impl std::fmt::Display for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) + std::fmt::Debug::fmt(self, fmt) } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 1a9167f..dbb8ac2 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -14,13 +14,11 @@ use pin::Pin; use std::{future, net, panic, pin, sync, task}; use sync::Arc; use task::{Context, Poll}; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; pub async fn host(node_config: Arc) -> Result<(), Error> { - let rawjh = taskrun::spawn(raw_service(node_config.clone())); + let rawjh = taskrun::spawn(disk::raw::raw_service(node_config.clone())); let addr = SocketAddr::from(([0, 0, 0, 0], node_config.node.port)); let make_service = make_service_fn({ move |conn| { @@ -240,23 +238,3 @@ async fn prebinned(req: Request, node_config: Arc) -> Result) -> Result<(), Error> { - let addr = format!("0.0.0.0:{}", node_config.node.port_raw); - let lis = tokio::net::TcpListener::bind(addr).await?; - loop { - match lis.accept().await { - Ok((stream, addr)) => { - taskrun::spawn(raw_conn_handler(stream, addr)); - } - Err(e) => Err(e)?, - } - } -} - -async fn raw_conn_handler(mut stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { - info!("RAW HANDLER for {:?}", addr); - stream.write_i32_le(123).await?; - stream.flush().await?; - Ok(()) -} diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 52aa172..3d4b6ec 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -38,13 +38,15 @@ async fn get_cached_0_inner() -> Result<(), Error> { let hosts = spawn_test_hosts(cluster.clone()); let beg_date: chrono::DateTime = "1970-01-01T00:00:10.000Z".parse()?; let end_date: chrono::DateTime = "1970-01-01T00:00:51.000Z".parse()?; - let channel = "wave1"; - let date_fmt = "%Y-%m-%dT%H:%M:%S%.3fZ"; + let channel_backend = "back"; + let channel_name = "wave1"; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( - "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_name={}&bin_count=4&beg_date={}&end_date={}", + "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count=4&beg_date={}&end_date={}", node0.host, node0.port, - channel, + channel_backend, + channel_name, beg_date.format(date_fmt), end_date.format(date_fmt), ); diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 89e9b3f..8ae63cb 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -16,7 +16,7 @@ pub fn run>>(f: F) -> Result panic::set_hook(Box::new(move |info| { error!( "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}", - backtrace::Backtrace::new(), + Error::with_msg("catched panic in taskrun::run"), info.location(), info.payload() );