diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index 14e565e..70797db 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -39,5 +39,6 @@ parse = { path = "../parse" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } +streamio = { path = "../streamio" } httpclient = { path = "../httpclient" } bitshuffle = { path = "../bitshuffle" } diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 16fecc6..4ff5c5e 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -15,7 +15,7 @@ use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use streams::tcprawclient::x_processed_event_blobs_stream_from_node; +use streamio::tcprawclient::x_processed_event_blobs_stream_from_node; type T001 = Pin> + Send>>; type T002 = Pin, items_0::streamitem::SitemErrTy>> + Send>>; diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index da0e6da..b183779 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -37,6 +37,7 @@ items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } streams = { path = "../streams" } +streamio = { path = "../streamio" } nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index bf2b806..81b6d72 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -266,10 +266,18 @@ async fn binned_json_framed( let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); - let stream = - streams::timebinnedjson::timebinned_json_framed(query, ch_conf, ctx, cache_read_provider, events_read_provider) - .instrument(span1) - .await?; + let stream_timeout = streamio::streamtimeout::StreamTimeout::new(); + let stream_timeout = Box::new(stream_timeout); + let stream = streams::timebinnedjson::timebinned_json_framed( + query, + ch_conf, + ctx, + cache_read_provider, + events_read_provider, + stream_timeout, + ) + .instrument(span1) + .await?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) diff --git a/crates/nodenet/Cargo.toml b/crates/nodenet/Cargo.toml index d377baf..1ebef77 100644 --- a/crates/nodenet/Cargo.toml +++ b/crates/nodenet/Cargo.toml @@ -28,4 +28,5 @@ dbconn = { path = "../dbconn" } scyllaconn = { path = "../scyllaconn" } taskrun = { path = "../taskrun" } streams = { path = "../streams" } +streamio = { path = "../streamio" } httpclient = { path = "../httpclient" } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 93af134..03462eb 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -27,9 +27,9 @@ use query::api4::events::Frame1Parts; use scyllaconn::worker::ScyllaQueue; use std::net::SocketAddr; use std::pin::Pin; +use streamio::tcpreadasbytes::TcpReadAsBytes; use streams::frames::inmem::BoxedBytesStream; use streams::frames::inmem::InMemoryFrameStream; -use streams::frames::inmem::TcpReadAsBytes; use streams::tcprawclient::TEST_BACKEND; use streams::transform::build_event_transform; use taskrun::tokio; diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index a808fc9..bf4195b 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -32,8 +32,8 @@ use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; +use streamio::tcpreadasbytes::TcpReadAsBytes; use streams::frames::inmem::InMemoryFrameStream; -use streams::frames::inmem::TcpReadAsBytes; use taskrun::tokio; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; diff --git a/crates/streamio/Cargo.toml b/crates/streamio/Cargo.toml new file mode 100644 index 0000000..7d0cc3d --- /dev/null +++ b/crates/streamio/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "streamio" +version = "0.0.2" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } +tokio-stream = "0.1.16" +futures-util = "0.3.15" +pin-project = "1.0.12" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" +typetag = "0.2.14" +ciborium = "0.2.1" +bytes = "1.6" +arrayref = "0.3.6" +crc32fast = "1.3.2" +byteorder = "1.4.3" +async-channel = "1.9.0" +rand_xoshiro = "0.6.0" +thiserror = "0.0.1" +chrono = { version = "0.4.19", features = ["serde"] } +wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } +netpod = { path = "../netpod" } +query = { path = "../query" } +items_0 = { path = "../items_0" } +items_2 = { path = "../items_2" } +parse = { path = "../parse" } +streams = { path = "../streams" } +http = "1" +http-body = "1" +http-body-util = "0.1.0" + +[dev-dependencies] +taskrun = { path = "../taskrun" } + +[features] +wasm_transform = ["wasmer"] + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/streamio/src/lib.rs b/crates/streamio/src/lib.rs new file mode 100644 index 0000000..f864227 --- /dev/null +++ b/crates/streamio/src/lib.rs @@ -0,0 +1,3 @@ +pub mod streamtimeout; +pub mod tcprawclient; +pub mod tcpreadasbytes; diff --git a/crates/streamio/src/streamtimeout.rs b/crates/streamio/src/streamtimeout.rs new file mode 100644 index 0000000..a195191 --- /dev/null +++ b/crates/streamio/src/streamtimeout.rs @@ -0,0 +1,23 @@ +use futures_util::Stream; +use std::pin::Pin; +use streams::streamtimeout::TimeoutableStream; + +pub struct StreamTimeout {} + +impl StreamTimeout { + pub fn new() -> Self { + Self {} + } +} + +impl streams::streamtimeout::StreamTimeout for StreamTimeout { + fn timeout_intervals(&self, inp: Pin + Send>>) -> Pin + Send>> { + todo!() + } +} + +impl streams::streamtimeout::StreamTimeout2 for StreamTimeout { + fn timeout_intervals(&self, inp: S) -> TimeoutableStream { + todo!() + } +} diff --git a/crates/streamio/src/tcprawclient.rs b/crates/streamio/src/tcprawclient.rs new file mode 100644 index 0000000..03b78d2 --- /dev/null +++ b/crates/streamio/src/tcprawclient.rs @@ -0,0 +1,111 @@ +use crate::tcpreadasbytes::TcpReadAsBytes; +use futures_util::Stream; +use futures_util::TryStreamExt; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::Sitemty; +use items_2::eventfull::EventFull; +use items_2::framable::Framable; +use items_2::frame::make_term_frame; +use netpod::log::*; +use netpod::Cluster; +use netpod::Node; +use netpod::ReqCtx; +use query::api4::events::EventsSubQuery; +use serde::de::DeserializeOwned; +use std::fmt; +use std::pin::Pin; +use streams::frames::eventsfromframes::EventsFromFrames; +use streams::frames::inmem::BoxedBytesStream; +use streams::frames::inmem::InMemoryFrameStream; +use streams::tcprawclient::make_node_command_frame; +use streams::tcprawclient::x_processed_event_blobs_stream_from_node_http; +use streams::tcprawclient::HttpSimplePost; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TcpRawClient")] +pub enum Error { + IO(#[from] std::io::Error), + Frame(#[from] items_2::frame::Error), + Framable(#[from] items_2::framable::Error), + StreamsRawClient(#[from] streams::tcprawclient::Error), +} + +pub type BoxedStream = Pin> + Send>>; + +pub async fn x_processed_event_blobs_stream_from_node_tcp( + subq: EventsSubQuery, + node: Node, +) -> Result> + Send>>, Error> { + let addr = format!("{}:{}", node.host, node.port_raw); + debug!("x_processed_event_blobs_stream_from_node to: {addr}",); + let frame1 = make_node_command_frame(subq.clone())?; + let net = TcpStream::connect(addr.clone()).await?; + let (netin, mut netout) = net.into_split(); + let item = sitem_data(frame1); + let buf = item.make_frame_dyn()?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + let inp = TcpReadAsBytes::new(netin).map_err(sitem_err2_from_string); + let inp = Box::pin(inp) as BoxedBytesStream; + let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); + let frames = frames.map_err(sitem_err2_from_string); + let frames = Box::pin(frames); + let items = EventsFromFrames::new(frames, addr); + Ok(Box::pin(items)) +} + +#[allow(unused)] +async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +where + // TODO group bounds in new trait + T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, +{ + // TODO when unit tests established, change to async connect: + let frame1 = make_node_command_frame(subq.clone())?; + let mut streams = Vec::new(); + for node in &cluster.nodes { + let addr = format!("{}:{}", node.host, node.port_raw); + debug!("open_tcp_streams to: {addr}"); + let net = TcpStream::connect(addr.clone()).await?; + let (netin, mut netout) = net.into_split(); + let item = sitem_data(frame1.clone()); + let buf = item.make_frame_dyn()?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + // TODO for images, we need larger buffer capacity + let inp = TcpReadAsBytes::new(netin); + let inp = inp.map_err(sitem_err2_from_string); + let inp = Box::pin(inp) as BoxedBytesStream; + let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); + let frames = frames.map_err(sitem_err2_from_string); + let frames = Box::pin(frames); + let stream = EventsFromFrames::::new(frames, addr); + streams.push(Box::pin(stream) as _); + } + Ok(streams) +} + +// Currently used only for the python data api3 protocol endpoint. +// TODO merge with main method. +pub async fn x_processed_event_blobs_stream_from_node( + subq: EventsSubQuery, + node: Node, + post: Box, + ctx: ReqCtx, +) -> Result> + Send>>, Error> { + if true { + Ok(x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await?) + } else { + x_processed_event_blobs_stream_from_node_tcp(subq, node).await + } +} diff --git a/crates/streamio/src/tcpreadasbytes.rs b/crates/streamio/src/tcpreadasbytes.rs new file mode 100644 index 0000000..916439c --- /dev/null +++ b/crates/streamio/src/tcpreadasbytes.rs @@ -0,0 +1,49 @@ +use bytes::Bytes; +use futures_util::Stream; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "TcpReadAsBytes")] +pub enum Error { + IO(#[from] std::io::Error), +} + +pub struct TcpReadAsBytes { + inp: INP, +} + +impl TcpReadAsBytes { + pub fn new(inp: INP) -> Self { + Self { inp } + } +} + +impl Stream for TcpReadAsBytes +where + INP: AsyncRead + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut buf1 = vec![0; 512]; + let mut buf2 = tokio::io::ReadBuf::new(&mut buf1); + match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inp), cx, &mut buf2) { + Ready(Ok(())) => { + let n = buf2.filled().len(); + if n == 0 { + Ready(None) + } else { + buf1.truncate(n); + let item = Bytes::from(buf1); + Ready(Some(Ok(item))) + } + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + } + } +} diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 90fd47c..9a2613a 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -5,10 +5,10 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } +tokio-stream = "0.1.16" futures-util = "0.3.15" pin-project = "1.0.12" -tokio-stream = "0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index 66c5167..1e275e2 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -35,44 +35,6 @@ pub type BoxedBytesStream = Pin> macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); } -pub struct TcpReadAsBytes { - inp: INP, -} - -impl TcpReadAsBytes { - pub fn new(inp: INP) -> Self { - Self { inp } - } -} - -impl Stream for TcpReadAsBytes -where - INP: AsyncRead + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - // TODO keep this small as long as InMemoryFrameStream uses SlideBuf internally. - let mut buf1 = vec![0; 128]; - let mut buf2 = tokio::io::ReadBuf::new(&mut buf1); - match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inp), cx, &mut buf2) { - Ready(Ok(())) => { - let n = buf2.filled().len(); - if n == 0 { - Ready(None) - } else { - buf1.truncate(n); - let item = Bytes::from(buf1); - Ready(Some(Ok(item))) - } - } - Ready(Err(e)) => Ready(Some(Err(e.into()))), - Pending => Pending, - } - } -} - /// Interprets a byte stream as length-delimited frames. /// /// Emits each frame as a single item. Therefore, each item must fit easily into memory. diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index e026435..9cacafd 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -20,6 +20,7 @@ pub mod plaineventsstream; pub mod print_on_done; pub mod rangefilter2; pub mod slidebuf; +pub mod streamtimeout; pub mod tcprawclient; #[cfg(test)] pub mod test; diff --git a/crates/streams/src/streamtimeout.rs b/crates/streams/src/streamtimeout.rs new file mode 100644 index 0000000..c7daac4 --- /dev/null +++ b/crates/streams/src/streamtimeout.rs @@ -0,0 +1,34 @@ +use futures_util::Stream; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct TimeoutableStream { + _t1: PhantomData, +} + +impl TimeoutableStream { + fn new() -> Self { + Self { _t1: PhantomData } + } +} + +impl Stream for TimeoutableStream +where + S: Stream, +{ + type Item = Option<::Item>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} + +pub trait StreamTimeout: Send { + fn timeout_intervals(&self, inp: Pin + Send>>) -> Pin + Send>>; +} + +pub trait StreamTimeout2: Send { + fn timeout_intervals(&self, inp: S) -> TimeoutableStream; +} diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index 466cb1e..c17b9f3 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -1,12 +1,6 @@ -//! Delivers event data. -//! -//! Delivers event data (not yet time-binned) from local storage and provides client functions -//! to request such data from nodes. - use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::BoxedBytesStream; use crate::frames::inmem::InMemoryFrameStream; -use crate::frames::inmem::TcpReadAsBytes; use bytes::Bytes; use bytes::BytesMut; use futures_util::Future; @@ -22,12 +16,10 @@ use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; -use items_2::frame::make_term_frame; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::ByteSize; use netpod::ChannelTypeConfigGen; -use netpod::Cluster; use netpod::Node; use netpod::ReqCtx; use netpod::APP_OCTET; @@ -40,8 +32,6 @@ use serde::de::DeserializeOwned; use std::fmt; use std::pin::Pin; use std::sync::Arc; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; pub const TEST_BACKEND: &str = "testbackend-00"; @@ -97,31 +87,6 @@ pub fn make_node_command_frame(query: EventsSubQuery) -> Result Result> + Send>>, Error> { - let addr = format!("{}:{}", node.host, node.port_raw); - debug!("x_processed_event_blobs_stream_from_node to: {addr}",); - let frame1 = make_node_command_frame(subq.clone())?; - let net = TcpStream::connect(addr.clone()).await?; - let (netin, mut netout) = net.into_split(); - let item = sitem_data(frame1); - let buf = item.make_frame_dyn()?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - let inp = TcpReadAsBytes::new(netin).map_err(sitem_err2_from_string); - let inp = Box::pin(inp) as BoxedBytesStream; - let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); - let frames = frames.map_err(sitem_err2_from_string); - let frames = Box::pin(frames); - let items = EventsFromFrames::new(frames, addr); - Ok(Box::pin(items)) -} - #[derive(Debug, thiserror::Error)] pub enum ErrorBody { #[error("{0}")] @@ -202,57 +167,6 @@ pub async fn x_processed_event_blobs_stream_from_node_http( Ok(Box::pin(stream)) } -// Currently used only for the python data api3 protocol endpoint. -// TODO merge with main method. -pub async fn x_processed_event_blobs_stream_from_node( - subq: EventsSubQuery, - node: Node, - post: Box, - ctx: ReqCtx, -) -> Result> + Send>>, Error> { - if true { - x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await - } else { - x_processed_event_blobs_stream_from_node_tcp(subq, node).await - } -} - -pub type BoxedStream = Pin> + Send>>; - -#[allow(unused)] -async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> -where - // TODO group bounds in new trait - T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, -{ - // TODO when unit tests established, change to async connect: - let frame1 = make_node_command_frame(subq.clone())?; - let mut streams = Vec::new(); - for node in &cluster.nodes { - let addr = format!("{}:{}", node.host, node.port_raw); - debug!("open_tcp_streams to: {addr}"); - let net = TcpStream::connect(addr.clone()).await?; - let (netin, mut netout) = net.into_split(); - let item = sitem_data(frame1.clone()); - let buf = item.make_frame_dyn()?; - netout.write_all(&buf).await?; - let buf = make_term_frame()?; - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - // TODO for images, we need larger buffer capacity - let inp = TcpReadAsBytes::new(netin); - let inp = inp.map_err(sitem_err2_from_string); - let inp = Box::pin(inp) as BoxedBytesStream; - let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); - let frames = frames.map_err(sitem_err2_from_string); - let frames = Box::pin(frames); - let stream = EventsFromFrames::::new(frames, addr); - streams.push(Box::pin(stream) as _); - } - Ok(streams) -} - pub fn container_stream_from_bytes_stream( inp: BoxedBytesStream, bufcap: ByteSize, diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index f066f52..3e42e8b 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -3,6 +3,8 @@ use crate::collect::CollectResult; use crate::json_stream::JsonBytes; use crate::json_stream::JsonStream; use crate::rangefilter2::RangeFilter2; +use crate::streamtimeout::StreamTimeout; +use crate::streamtimeout::StreamTimeout2; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; @@ -364,6 +366,7 @@ pub async fn timebinned_json_framed( ctx: &ReqCtx, cache_read_provider: Arc, events_read_provider: Arc, + stream_timeout_provider: Box>>, ) -> Result { trace!("timebinned_json_framed"); let binned_range = query.covering_range()?;