This commit is contained in:
Dominik Werder
2024-11-06 16:29:28 +01:00
parent 1364f43f6f
commit aa126888ab
18 changed files with 287 additions and 133 deletions

View File

@@ -39,5 +39,6 @@ parse = { path = "../parse" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
streams = { path = "../streams" }
streamio = { path = "../streamio" }
httpclient = { path = "../httpclient" }
bitshuffle = { path = "../bitshuffle" }

View File

@@ -15,7 +15,7 @@ use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use streams::tcprawclient::x_processed_event_blobs_stream_from_node;
use streamio::tcprawclient::x_processed_event_blobs_stream_from_node;
type T001<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
type T002<T> = Pin<Box<dyn Future<Output = Result<T001<T>, items_0::streamitem::SitemErrTy>> + Send>>;

View File

@@ -37,6 +37,7 @@ items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
streams = { path = "../streams" }
streamio = { path = "../streamio" }
nodenet = { path = "../nodenet" }
commonio = { path = "../commonio" }
taskrun = { path = "../taskrun" }

View File

@@ -266,10 +266,18 @@ async fn binned_json_framed(
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
let stream =
streams::timebinnedjson::timebinned_json_framed(query, ch_conf, ctx, cache_read_provider, events_read_provider)
.instrument(span1)
.await?;
let stream_timeout = streamio::streamtimeout::StreamTimeout::new();
let stream_timeout = Box::new(stream_timeout);
let stream = streams::timebinnedjson::timebinned_json_framed(
query,
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
stream_timeout,
)
.instrument(span1)
.await?;
let stream = bytes_chunks_to_len_framed_str(stream);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)

View File

@@ -28,4 +28,5 @@ dbconn = { path = "../dbconn" }
scyllaconn = { path = "../scyllaconn" }
taskrun = { path = "../taskrun" }
streams = { path = "../streams" }
streamio = { path = "../streamio" }
httpclient = { path = "../httpclient" }

View File

@@ -27,9 +27,9 @@ use query::api4::events::Frame1Parts;
use scyllaconn::worker::ScyllaQueue;
use std::net::SocketAddr;
use std::pin::Pin;
use streamio::tcpreadasbytes::TcpReadAsBytes;
use streams::frames::inmem::BoxedBytesStream;
use streams::frames::inmem::InMemoryFrameStream;
use streams::frames::inmem::TcpReadAsBytes;
use streams::tcprawclient::TEST_BACKEND;
use streams::transform::build_event_transform;
use taskrun::tokio;

View File

@@ -32,8 +32,8 @@ use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::transform::TransformQuery;
use streamio::tcpreadasbytes::TcpReadAsBytes;
use streams::frames::inmem::InMemoryFrameStream;
use streams::frames::inmem::TcpReadAsBytes;
use taskrun::tokio;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;

View File

@@ -0,0 +1,43 @@
[package]
name = "streamio"
version = "0.0.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] }
tokio-stream = "0.1.16"
futures-util = "0.3.15"
pin-project = "1.0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
typetag = "0.2.14"
ciborium = "0.2.1"
bytes = "1.6"
arrayref = "0.3.6"
crc32fast = "1.3.2"
byteorder = "1.4.3"
async-channel = "1.9.0"
rand_xoshiro = "0.6.0"
thiserror = "0.0.1"
chrono = { version = "0.4.19", features = ["serde"] }
wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true }
netpod = { path = "../netpod" }
query = { path = "../query" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
streams = { path = "../streams" }
http = "1"
http-body = "1"
http-body-util = "0.1.0"
[dev-dependencies]
taskrun = { path = "../taskrun" }
[features]
wasm_transform = ["wasmer"]
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -0,0 +1,3 @@
pub mod streamtimeout;
pub mod tcprawclient;
pub mod tcpreadasbytes;

View File

@@ -0,0 +1,23 @@
use futures_util::Stream;
use std::pin::Pin;
use streams::streamtimeout::TimeoutableStream;
pub struct StreamTimeout {}
impl StreamTimeout {
pub fn new() -> Self {
Self {}
}
}
impl<S> streams::streamtimeout::StreamTimeout<S> for StreamTimeout {
fn timeout_intervals(&self, inp: Pin<Box<dyn Stream<Item = S> + Send>>) -> Pin<Box<dyn Stream<Item = S> + Send>> {
todo!()
}
}
impl<S> streams::streamtimeout::StreamTimeout2<S> for StreamTimeout {
fn timeout_intervals(&self, inp: S) -> TimeoutableStream<S> {
todo!()
}
}

View File

@@ -0,0 +1,111 @@
use crate::tcpreadasbytes::TcpReadAsBytes;
use futures_util::Stream;
use futures_util::TryStreamExt;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
use items_2::framable::Framable;
use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::Cluster;
use netpod::Node;
use netpod::ReqCtx;
use query::api4::events::EventsSubQuery;
use serde::de::DeserializeOwned;
use std::fmt;
use std::pin::Pin;
use streams::frames::eventsfromframes::EventsFromFrames;
use streams::frames::inmem::BoxedBytesStream;
use streams::frames::inmem::InMemoryFrameStream;
use streams::tcprawclient::make_node_command_frame;
use streams::tcprawclient::x_processed_event_blobs_stream_from_node_http;
use streams::tcprawclient::HttpSimplePost;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TcpRawClient")]
pub enum Error {
IO(#[from] std::io::Error),
Frame(#[from] items_2::frame::Error),
Framable(#[from] items_2::framable::Error),
StreamsRawClient(#[from] streams::tcprawclient::Error),
}
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub async fn x_processed_event_blobs_stream_from_node_tcp(
subq: EventsSubQuery,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("x_processed_event_blobs_stream_from_node to: {addr}",);
let frame1 = make_node_command_frame(subq.clone())?;
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1);
let buf = item.make_frame_dyn()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let inp = TcpReadAsBytes::new(netin).map_err(sitem_err2_from_string);
let inp = Box::pin(inp) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);
let items = EventsFromFrames::new(frames, addr);
Ok(Box::pin(items))
}
#[allow(unused)]
async fn open_event_data_streams_tcp<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
// TODO when unit tests established, change to async connect:
let frame1 = make_node_command_frame(subq.clone())?;
let mut streams = Vec::new();
for node in &cluster.nodes {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("open_tcp_streams to: {addr}");
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1.clone());
let buf = item.make_frame_dyn()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let inp = TcpReadAsBytes::new(netin);
let inp = inp.map_err(sitem_err2_from_string);
let inp = Box::pin(inp) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, addr);
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}
// Currently used only for the python data api3 protocol endpoint.
// TODO merge with main method.
pub async fn x_processed_event_blobs_stream_from_node(
subq: EventsSubQuery,
node: Node,
post: Box<dyn HttpSimplePost>,
ctx: ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
if true {
Ok(x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await?)
} else {
x_processed_event_blobs_stream_from_node_tcp(subq, node).await
}
}

View File

@@ -0,0 +1,49 @@
use bytes::Bytes;
use futures_util::Stream;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TcpReadAsBytes")]
pub enum Error {
IO(#[from] std::io::Error),
}
pub struct TcpReadAsBytes<INP> {
inp: INP,
}
impl<INP> TcpReadAsBytes<INP> {
pub fn new(inp: INP) -> Self {
Self { inp }
}
}
impl<INP> Stream for TcpReadAsBytes<INP>
where
INP: AsyncRead + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut buf1 = vec![0; 512];
let mut buf2 = tokio::io::ReadBuf::new(&mut buf1);
match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inp), cx, &mut buf2) {
Ready(Ok(())) => {
let n = buf2.filled().len();
if n == 0 {
Ready(None)
} else {
buf1.truncate(n);
let item = Bytes::from(buf1);
Ready(Some(Ok(item)))
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
}
}
}

View File

@@ -5,10 +5,10 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] }
tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] }
tokio-stream = "0.1.16"
futures-util = "0.3.15"
pin-project = "1.0.12"
tokio-stream = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"

View File

@@ -35,44 +35,6 @@ pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, SitemErrTy>>
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); }
pub struct TcpReadAsBytes<INP> {
inp: INP,
}
impl<INP> TcpReadAsBytes<INP> {
pub fn new(inp: INP) -> Self {
Self { inp }
}
}
impl<INP> Stream for TcpReadAsBytes<INP>
where
INP: AsyncRead + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// TODO keep this small as long as InMemoryFrameStream uses SlideBuf internally.
let mut buf1 = vec![0; 128];
let mut buf2 = tokio::io::ReadBuf::new(&mut buf1);
match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inp), cx, &mut buf2) {
Ready(Ok(())) => {
let n = buf2.filled().len();
if n == 0 {
Ready(None)
} else {
buf1.truncate(n);
let item = Bytes::from(buf1);
Ready(Some(Ok(item)))
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
}
}
}
/// Interprets a byte stream as length-delimited frames.
///
/// Emits each frame as a single item. Therefore, each item must fit easily into memory.

View File

@@ -20,6 +20,7 @@ pub mod plaineventsstream;
pub mod print_on_done;
pub mod rangefilter2;
pub mod slidebuf;
pub mod streamtimeout;
pub mod tcprawclient;
#[cfg(test)]
pub mod test;

View File

@@ -0,0 +1,34 @@
use futures_util::Stream;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct TimeoutableStream<S> {
_t1: PhantomData<S>,
}
impl<S> TimeoutableStream<S> {
fn new() -> Self {
Self { _t1: PhantomData }
}
}
impl<S> Stream for TimeoutableStream<S>
where
S: Stream,
{
type Item = Option<<S as Stream>::Item>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}
pub trait StreamTimeout<S>: Send {
fn timeout_intervals(&self, inp: Pin<Box<dyn Stream<Item = S> + Send>>) -> Pin<Box<dyn Stream<Item = S> + Send>>;
}
pub trait StreamTimeout2<S>: Send {
fn timeout_intervals(&self, inp: S) -> TimeoutableStream<S>;
}

View File

@@ -1,12 +1,6 @@
//! Delivers event data.
//!
//! Delivers event data (not yet time-binned) from local storage and provides client functions
//! to request such data from nodes.
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::BoxedBytesStream;
use crate::frames::inmem::InMemoryFrameStream;
use crate::frames::inmem::TcpReadAsBytes;
use bytes::Bytes;
use bytes::BytesMut;
use futures_util::Future;
@@ -22,12 +16,10 @@ use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::ByteSize;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::Node;
use netpod::ReqCtx;
use netpod::APP_OCTET;
@@ -40,8 +32,6 @@ use serde::de::DeserializeOwned;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub const TEST_BACKEND: &str = "testbackend-00";
@@ -97,31 +87,6 @@ pub fn make_node_command_frame(query: EventsSubQuery) -> Result<EventQueryJsonSt
Ok(EventQueryJsonStringFrame(ret))
}
pub async fn x_processed_event_blobs_stream_from_node_tcp(
subq: EventsSubQuery,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("x_processed_event_blobs_stream_from_node to: {addr}",);
let frame1 = make_node_command_frame(subq.clone())?;
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1);
let buf = item.make_frame_dyn()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let inp = TcpReadAsBytes::new(netin).map_err(sitem_err2_from_string);
let inp = Box::pin(inp) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);
let items = EventsFromFrames::new(frames, addr);
Ok(Box::pin(items))
}
#[derive(Debug, thiserror::Error)]
pub enum ErrorBody {
#[error("{0}")]
@@ -202,57 +167,6 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
Ok(Box::pin(stream))
}
// Currently used only for the python data api3 protocol endpoint.
// TODO merge with main method.
pub async fn x_processed_event_blobs_stream_from_node(
subq: EventsSubQuery,
node: Node,
post: Box<dyn HttpSimplePost>,
ctx: ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
if true {
x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await
} else {
x_processed_event_blobs_stream_from_node_tcp(subq, node).await
}
}
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
#[allow(unused)]
async fn open_event_data_streams_tcp<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
// TODO when unit tests established, change to async connect:
let frame1 = make_node_command_frame(subq.clone())?;
let mut streams = Vec::new();
for node in &cluster.nodes {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("open_tcp_streams to: {addr}");
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1.clone());
let buf = item.make_frame_dyn()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let inp = TcpReadAsBytes::new(netin);
let inp = inp.map_err(sitem_err2_from_string);
let inp = Box::pin(inp) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, addr);
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}
pub fn container_stream_from_bytes_stream<T>(
inp: BoxedBytesStream,
bufcap: ByteSize,

View File

@@ -3,6 +3,8 @@ use crate::collect::CollectResult;
use crate::json_stream::JsonBytes;
use crate::json_stream::JsonStream;
use crate::rangefilter2::RangeFilter2;
use crate::streamtimeout::StreamTimeout;
use crate::streamtimeout::StreamTimeout2;
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
@@ -364,6 +366,7 @@ pub async fn timebinned_json_framed(
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
stream_timeout_provider: Box<dyn StreamTimeout2<Box<dyn CollectableDyn>>>,
) -> Result<JsonStream, Error> {
trace!("timebinned_json_framed");
let binned_range = query.covering_range()?;