diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index e153119..e7ed377 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.4.3" +version = "0.4.4-alpha.0" authors = ["Dominik Werder "] edition = "2021" @@ -16,8 +16,7 @@ serde_json = "1.0" serde_yaml = "0.9.16" chrono = "0.4" url = "2.2.2" -clap = { version = "4.3.15", features = ["derive", "cargo"] } -lazy_static = "1.4.0" +clap = { version = "4.3.21", features = ["derive", "cargo"] } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } diff --git a/crates/daqbufp2/src/client.rs b/crates/daqbufp2/src/client.rs index deea1f2..2ddba40 100644 --- a/crates/daqbufp2/src/client.rs +++ b/crates/daqbufp2/src/client.rs @@ -17,7 +17,8 @@ use netpod::HostPort; use netpod::SfDbChannel; use netpod::APP_OCTET; use query::api4::binned::BinnedQuery; -use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use streams::frames::inmem::InMemoryFrameStream; +use streams::frames::inmem::TcpReadAsBytes; use url::Url; pub async fn status(host: String, port: u16) -> Result<(), Error> { @@ -94,7 +95,7 @@ pub async fn get_binned( ))); } let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, ByteSize::from_kb(8)); + let s2 = InMemoryFrameStream::new(TcpReadAsBytes::new(s1), ByteSize::from_kb(8)); use futures_util::StreamExt; use std::future::ready; let s3 = s2 diff --git a/crates/disk/src/aggtest.rs b/crates/disk/src/aggtest.rs index 75da0e7..7fc1673 100644 --- a/crates/disk/src/aggtest.rs +++ b/crates/disk/src/aggtest.rs @@ -20,7 +20,7 @@ use netpod::Shape; pub fn make_test_node(id: u32) -> Node { Node { host: "localhost".into(), - listen: "0.0.0.0".into(), + listen: None, port: 8800 + id as u16, port_raw: 8800 + id as u16 + 100, // TODO use a common function to supply the tmp path. diff --git a/crates/disk/src/gen.rs b/crates/disk/src/gen.rs index df0fcbe..3085122 100644 --- a/crates/disk/src/gen.rs +++ b/crates/disk/src/gen.rs @@ -112,7 +112,7 @@ pub async fn gen_test_data() -> Result<(), Error> { for i1 in 0..3 { let node = Node { host: "localhost".into(), - listen: "0.0.0.0".into(), + listen: None, port: 7780 + i1 as u16, port_raw: 7780 + i1 as u16 + 100, cache_base_path: data_base_path.join(format!("node{:02}", i1)), diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index 85e04a6..8e7f810 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -466,6 +466,13 @@ impl fmt::Display for PublicError { } } +impl ToPublicError for Error { + fn to_public_error(&self) -> String { + let e = PublicError::from(self); + e.msg().into() + } +} + pub fn todo() { let bt = backtrace::Backtrace::new(); eprintln!("TODO\n{bt:?}"); diff --git a/crates/httpclient/src/lib.rs b/crates/httpclient/src/lib.rs index 103cd3c..25060ac 100644 --- a/crates/httpclient/src/lib.rs +++ b/crates/httpclient/src/lib.rs @@ -2,4 +2,5 @@ pub mod httpclient; pub use crate::httpclient::*; pub use http; +pub use hyper; pub use url; diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index 342c73f..abaea30 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -1,5 +1,6 @@ pub mod binned; pub mod databuffer_tools; +pub mod eventdata; pub mod events; pub mod search; pub mod status; diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs new file mode 100644 index 0000000..a109db9 --- /dev/null +++ b/crates/httpret/src/api4/eventdata.rs @@ -0,0 +1,89 @@ +use crate::response; +use crate::response_err; +use crate::ReqCtx; +use err::thiserror; +use err::ThisError; +use err::ToPublicError; +use futures_util::TryStreamExt; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::NodeConfigCached; +use netpod::ServiceVersion; + +#[derive(Debug, ThisError)] +pub enum EventDataError { + HttpBadMethod, + HttpBadAccept, + QueryParse, + #[error("Error({0})")] + Error(Box), + InternalError, +} + +impl ToPublicError for EventDataError { + fn to_public_error(&self) -> String { + match self { + EventDataError::HttpBadMethod => format!("{self}"), + EventDataError::HttpBadAccept => format!("{self}"), + EventDataError::QueryParse => format!("{self}"), + EventDataError::Error(e) => e.to_public_error(), + EventDataError::InternalError => format!("{self}"), + } + } +} + +pub struct EventDataHandler {} + +impl EventDataHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path().eq("/api/4/private/eventdata/frames") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Request, + _ctx: &ReqCtx, + ncc: &NodeConfigCached, + _service_version: &ServiceVersion, + ) -> Result, EventDataError> { + if req.method() != Method::POST { + Ok(response(StatusCode::NOT_ACCEPTABLE) + .body(Body::empty()) + .map_err(|_| EventDataError::InternalError)?) + } else { + match Self::handle_req(req, ncc).await { + Ok(ret) => Ok(ret), + Err(e) => { + error!("{e}"); + let res = response_err(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) + .map_err(|_| EventDataError::InternalError)?; + Ok(res) + } + } + } + } + + async fn handle_req(req: Request, ncc: &NodeConfigCached) -> Result, EventDataError> { + let (_head, body) = req.into_parts(); + let frames = + nodenet::conn::events_get_input_frames(body.map_err(|e| err::Error::with_msg_no_trace(e.to_string()))) + .await + .map_err(|_| EventDataError::InternalError)?; + let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; + let stream = nodenet::conn::create_response_bytes_stream(evsubq, ncc) + .await + .map_err(|e| EventDataError::Error(Box::new(e)))?; + let ret = response(StatusCode::OK) + .body(Body::wrap_stream(stream)) + .map_err(|_| EventDataError::InternalError)?; + Ok(ret) + } +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 8b82824..6858b0e 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -132,14 +132,9 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion if let Some(bind) = node_config.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); } - let _update_task = if node_config.node_config.cluster.run_map_pulse_task { - Some(UpdateTask::new(node_config.clone())) - } else { - None - }; - let rawjh = taskrun::spawn(events_service(node_config.clone())); + // let rawjh = taskrun::spawn(events_service(node_config.clone())); use std::str::FromStr; - let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; + let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; let make_service = make_service_fn({ move |conn: &AddrStream| { debug!("new connection from {:?}", conn.remote_addr()); @@ -147,20 +142,19 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion let addr = conn.remote_addr(); let service_version = service_version.clone(); async move { - Ok::<_, Error>(service_fn({ - move |req| { - // TODO send to logstash - info!( - "http-request {:?} - {:?} - {:?} - {:?}", - addr, - req.method(), - req.uri(), - req.headers() - ); - let f = http_service(req, node_config.clone(), service_version.clone()); - Cont { f: Box::pin(f) } - } - })) + let ret = service_fn(move |req| { + // TODO send to logstash + info!( + "http-request {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); + let f = http_service(req, node_config.clone(), service_version.clone()); + Cont { f: Box::pin(f) } + }); + Ok::<_, Error>(ret) } } }); @@ -168,7 +162,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion .serve(make_service) .await .map(|e| RetrievalError::TextError(format!("{e:?}")))?; - rawjh.await??; + // rawjh.await??; Ok(()) } @@ -382,6 +376,10 @@ async fn http_service_inner( } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config, service_version) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?) } else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) { Ok(h.handle(req, ctx, &node_config, service_version).await?) } else if let Some(h) = StatusBoardAllHandler::handler(&req) { diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index e6c2b17..9e8a6bb 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -164,7 +164,6 @@ pub struct MapPulseHisto { _counts: Vec, } -const MAP_INDEX_FULL_URL_PREFIX: &'static str = "/api/1/map/index/full/"; const _MAP_INDEX_FAST_URL_PREFIX: &'static str = "/api/1/map/index/fast/"; const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/"; const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/"; @@ -490,7 +489,7 @@ pub struct IndexFullHttpFunction {} impl IndexFullHttpFunction { pub fn handler(req: &Request) -> Option { - if req.uri().path().starts_with(MAP_INDEX_FULL_URL_PREFIX) { + if req.uri().path().eq("/api/1/map/index/full") { Some(Self {}) } else { None @@ -724,7 +723,10 @@ impl Future for UpdateTask { } impl UpdateTask { - pub fn new(node_config: NodeConfigCached) -> UpdateTaskGuard { + /// Returns a guard which must be kept alive as long as the service should run. + /// Should instead of this use a system-timer and call the rest api. + #[allow(unused)] + fn new(node_config: NodeConfigCached) -> UpdateTaskGuard { let do_abort = Arc::new(AtomicUsize::default()); let task = Self { do_abort: do_abort.clone(), diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index c6dacc8..7df04c5 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -381,10 +381,10 @@ pub struct ChannelArchiver { pub struct Node { pub host: String, // TODO for `listen` and the ports, would be great to allow a default on Cluster level. - pub listen: String, + pub listen: Option, #[serde(deserialize_with = "serde_port::port_from_any")] pub port: u16, - #[serde(deserialize_with = "serde_port::port_from_any")] + #[serde(deserialize_with = "serde_port::port_from_any", default)] pub port_raw: u16, pub cache_base_path: PathBuf, pub sf_databuffer: Option, @@ -471,7 +471,7 @@ impl Node { pub fn dummy() -> Self { Self { host: "dummy".into(), - listen: "dummy".into(), + listen: None, port: 4444, port_raw: 4444, cache_base_path: PathBuf::new(), @@ -489,7 +489,17 @@ impl Node { // TODO should a node know how to reach itself? Because, depending on network // topology (proxies etc.) the way to reach a node depends on the tuple `(node, client)`. pub fn baseurl(&self) -> Url { - format!("http://{}:{}/api/4/", self.host, self.port).parse().unwrap() + // TODO should be able to decide whether we are reachable via tls. + // So far this does not matter because this `baseurl` is used for internal communication + // and is always non-tls. + format!("http://{}:{}", self.host, self.port).parse().unwrap() + } + + pub fn listen(&self) -> String { + match &self.listen { + Some(x) => x.into(), + None => "0.0.0.0".into(), + } } } @@ -2923,7 +2933,7 @@ pub fn test_cluster() -> Cluster { .into_iter() .map(|id| Node { host: "localhost".into(), - listen: "0.0.0.0".into(), + listen: None, port: 6170 + id as u16, port_raw: 6170 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), @@ -2960,7 +2970,7 @@ pub fn sls_test_cluster() -> Cluster { .into_iter() .map(|id| Node { host: "localhost".into(), - listen: "0.0.0.0".into(), + listen: None, port: 6190 + id as u16, port_raw: 6190 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), @@ -2995,7 +3005,7 @@ pub fn archapp_test_cluster() -> Cluster { .into_iter() .map(|id| Node { host: "localhost".into(), - listen: "0.0.0.0".into(), + listen: None, port: 6200 + id as u16, port_raw: 6200 + id as u16 + 100, cache_base_path: test_data_base_path_databuffer().join(format!("node{:02}", id)), diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 9eff1a0..eafbbb9 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -177,7 +177,7 @@ pub async fn http_get_channel_config( baseurl: Url, ) -> Result, Error> { let url = baseurl; - let mut url = url.join("channel/config").unwrap(); + let mut url = url.join("/api/4/channel/config").unwrap(); qu.append_to_url(&mut url); let res = httpclient::http_get(url, APP_JSON).await?; use httpclient::http::StatusCode; diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 0c66fac..ffd4051 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -1,4 +1,5 @@ use crate::scylla::scylla_channel_event_stream; +use bytes::Bytes; use err::thiserror; use err::Error; use err::ThisError; @@ -27,13 +28,13 @@ use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; use std::net::SocketAddr; use std::pin::Pin; -use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use streams::frames::inmem::InMemoryFrameStream; +use streams::frames::inmem::TcpReadAsBytes; use streams::generators::GenerateF64V00; use streams::generators::GenerateI32V00; use streams::generators::GenerateI32V01; use streams::transform::build_event_transform; use tokio::io::AsyncWriteExt; -use tokio::net::tcp::OwnedReadHalf; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; @@ -47,7 +48,7 @@ mod test; pub enum NodeNetError {} pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { - let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); + let addr = format!("{}:{}", node_config.node.listen(), node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; loop { match lis.accept().await { @@ -153,69 +154,70 @@ async fn make_channel_events_stream( Ok(ret) } +pub type BytesStreamBox = Pin> + Send>>; + +pub async fn create_response_bytes_stream( + evq: EventsSubQuery, + ncc: &NodeConfigCached, +) -> Result { + let reqctx = netpod::ReqCtx::new(evq.reqid()); + if evq.create_errors_contains("nodenet_parse_query") { + let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); + return Err(e); + } + if evq.is_event_blobs() { + // TODO support event blobs as transform + let fetch_info = evq.ch_conf().to_sf_databuffer()?; + let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc).await?; + // let stream = stream.map(|x| Box::new(x) as _); + let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let ret = Box::pin(stream); + Ok(ret) + } else { + let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?; + if false { + // TODO wasm example + use wasmer::Value; + let wasm = b""; + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + let import_object = wasmer::imports! {}; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let add_one = instance.exports.get_function("event_transform").unwrap(); + let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); + assert_eq!(result[0], Value::I32(43)); + } + let mut tr = match build_event_transform(evq.transform()) { + Ok(x) => x, + Err(e) => { + return Err(e); + } + }; + let stream = stream.map(move |x| { + let item = on_sitemty_data!(x, |x| { + let x: Box = Box::new(x); + let x = tr.0.transform(x); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + Box::new(item) as Box + }); + let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let ret = Box::pin(stream); + Ok(ret) + } +} + async fn events_conn_handler_with_reqid( mut netout: OwnedWriteHalf, evq: EventsSubQuery, ncc: &NodeConfigCached, ) -> Result<(), ConnErr> { - let reqctx = netpod::ReqCtx::new(evq.reqid()); - if evq.create_errors_contains("nodenet_parse_query") { - let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); - return Err((e, netout).into()); - } - let stream: Pin> + Send>> = if evq.is_event_blobs() { - // TODO support event blobs as transform - let fetch_info = match evq.ch_conf().to_sf_databuffer() { - Ok(x) => x, - Err(e) => return Err((e, netout).into()), - }; - match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc).await { - Ok(stream) => { - let stream = stream.map(|x| Box::new(x) as _); - Box::pin(stream) - } - Err(e) => return Err((e, netout).into()), - } - } else { - match make_channel_events_stream(evq.clone(), reqctx, ncc).await { - Ok(stream) => { - if false { - // TODO wasm example - use wasmer::Value; - let wasm = b""; - let mut store = wasmer::Store::default(); - let module = wasmer::Module::new(&store, wasm).unwrap(); - let import_object = wasmer::imports! {}; - let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); - let add_one = instance.exports.get_function("event_transform").unwrap(); - let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); - assert_eq!(result[0], Value::I32(43)); - } - let mut tr = match build_event_transform(evq.transform()) { - Ok(x) => x, - Err(e) => { - return Err((e, netout).into()); - } - }; - let stream = stream.map(move |x| { - let item = on_sitemty_data!(x, |x| { - let x: Box = Box::new(x); - let x = tr.0.transform(x); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - }); - Box::new(item) as Box - }); - Box::pin(stream) - } - Err(e) => { - return Err((e, netout).into()); - } - } + let mut stream = match create_response_bytes_stream(evq, ncc).await { + Ok(x) => x, + Err(e) => return Err((e, netout))?, }; - let mut stream = stream; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = stream.next().await { - let item = item.make_frame(); match item { Ok(buf) => { buf_len_histo.ingest(buf.len() as u32); @@ -265,8 +267,11 @@ async fn events_conn_handler_with_reqid( Ok(()) } -async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { - let mut h = InMemoryFrameAsyncReadStream::new(netin, netpod::ByteSize::from_kb(8)); +pub async fn events_get_input_frames(netin: INP) -> Result, Error> +where + INP: Stream> + Unpin, +{ + let mut h = InMemoryFrameStream::new(netin, netpod::ByteSize::from_kb(8)); let mut frames = Vec::new(); while let Some(k) = h .next() @@ -288,7 +293,7 @@ async fn events_get_input_frames(netin: OwnedReadHalf) -> Result) -> Result<(EventsSubQuery,), Error> { +pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSubQuery,), Error> { if frames.len() != 1 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); @@ -321,18 +326,21 @@ async fn events_parse_input_query(frames: Vec) -> Result<(EventsS Ok(frame1.parts()) } -async fn events_conn_handler_inner_try( - stream: TcpStream, +async fn events_conn_handler_inner_try( + netin: INP, + netout: OwnedWriteHalf, addr: SocketAddr, ncc: &NodeConfigCached, -) -> Result<(), ConnErr> { +) -> Result<(), ConnErr> +where + INP: Stream> + Unpin, +{ let _ = addr; - let (netin, netout) = stream.into_split(); let frames = match events_get_input_frames(netin).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - let (evq,) = match events_parse_input_query(frames).await { + let (evq,) = match events_parse_input_query(frames) { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; @@ -342,12 +350,16 @@ async fn events_conn_handler_inner_try( events_conn_handler_with_reqid(netout, evq, ncc).instrument(span).await } -async fn events_conn_handler_inner( - stream: TcpStream, +async fn events_conn_handler_inner( + netin: INP, + netout: OwnedWriteHalf, addr: SocketAddr, node_config: &NodeConfigCached, -) -> Result<(), Error> { - match events_conn_handler_inner_try(stream, addr, node_config).await { +) -> Result<(), Error> +where + INP: Stream> + Unpin, +{ + match events_conn_handler_inner_try(netin, netout, addr, node_config).await { Ok(_) => (), Err(ce) => { let mut out = ce.netout; @@ -360,8 +372,10 @@ async fn events_conn_handler_inner( } async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { + let (netin, netout) = stream.into_split(); + let inp = Box::new(TcpReadAsBytes::new(netin)); let span1 = span!(Level::INFO, "events_conn_handler"); - let r = events_conn_handler_inner(stream, addr, &node_config) + let r = events_conn_handler_inner(inp, netout, addr, &node_config) .instrument(span1) .await; match r { diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 33d3a42..13b2391 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -32,7 +32,8 @@ use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; -use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use streams::frames::inmem::InMemoryFrameStream; +use streams::frames::inmem::TcpReadAsBytes; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -67,7 +68,7 @@ fn raw_data_00() { }, node: Node { host: "empty".into(), - listen: "listen_dummy".into(), + listen: None, port: 9090, port_raw: 9090, cache_base_path: "".into(), @@ -107,7 +108,8 @@ fn raw_data_00() { con.shutdown().await.unwrap(); eprintln!("shut down"); - let mut frames = InMemoryFrameAsyncReadStream::new(con, qu.inmem_bufcap()); + let (netin, netout) = con.into_split(); + let mut frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), qu.inmem_bufcap()); while let Some(frame) = frames.next().await { match frame { Ok(frame) => match frame { diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 20acea8..b800b67 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -25,6 +25,7 @@ items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } +httpclient = { path = "../httpclient" } [dev-dependencies] taskrun = { path = "../taskrun" } diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index 8f3151d..ae78d91 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -15,7 +15,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::ReadBuf; #[allow(unused)] macro_rules! trace2 { @@ -29,14 +28,54 @@ impl err::ToErr for crate::slidebuf::Error { } } +pub struct TcpReadAsBytes { + inp: INP, +} + +impl TcpReadAsBytes { + pub fn new(inp: INP) -> Self { + Self { inp } + } +} + +impl Stream for TcpReadAsBytes +where + INP: AsyncRead + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + // TODO keep this small as long as InMemoryFrameStream uses SlideBuf internally. + let mut buf1 = vec![0; 128]; + let mut buf2 = tokio::io::ReadBuf::new(&mut buf1); + match tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inp), cx, &mut buf2) { + Ready(Ok(())) => { + let n = buf2.filled().len(); + if n == 0 { + Ready(None) + } else { + buf1.truncate(n); + let item = Bytes::from(buf1); + Ready(Some(Ok(item))) + } + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + } + } +} + /// Interprets a byte stream as length-delimited frames. /// /// Emits each frame as a single item. Therefore, each item must fit easily into memory. -pub struct InMemoryFrameAsyncReadStream +pub struct InMemoryFrameStream where - T: AsyncRead + Unpin, + T: Stream> + Unpin, { inp: T, + // TODO since we moved to input stream of Bytes, we have the danger that the ring buffer + // is not large enough. Actually, this should rather use a RopeBuf with incoming owned bufs. buf: SlideBuf, need_min: usize, done: bool, @@ -44,9 +83,9 @@ where inp_bytes_consumed: u64, } -impl InMemoryFrameAsyncReadStream +impl InMemoryFrameStream where - T: AsyncRead + Unpin, + T: Stream> + Unpin, { pub fn type_name() -> &'static str { std::any::type_name::() @@ -66,20 +105,35 @@ where fn poll_upstream(&mut self, cx: &mut Context) -> Poll> { trace2!("poll_upstream"); use Poll::*; - let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?); + // use tokio::io::AsyncRead; + // use tokio::io::ReadBuf; + // let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min.saturating_sub(self.buf.len()))?); let inp = &mut self.inp; pin_mut!(inp); trace!("poll_upstream"); - match AsyncRead::poll_read(inp, cx, &mut buf) { - Ready(Ok(())) => { - let n = buf.filled().len(); - self.buf.wadv(n)?; - trace2!("recv bytes {}", n); - Ready(Ok(n)) - } - Ready(Err(e)) => Ready(Err(e.into())), + match inp.poll_next(cx) { + Ready(Some(Ok(x))) => match self.buf.available_writable_area(x.len()) { + Ok(dst) => { + dst[..x.len()].copy_from_slice(&x); + self.buf.wadv(x.len())?; + Ready(Ok(x.len())) + } + Err(e) => Ready(Err(e.into())), + }, + Ready(Some(Err(_e))) => Ready(Err(Error::with_msg_no_trace("input error"))), + Ready(None) => Ready(Ok(0)), Pending => Pending, } + // match AsyncRead::poll_read(inp, cx, &mut buf) { + // Ready(Ok(())) => { + // let n = buf.filled().len(); + // self.buf.wadv(n)?; + // trace2!("recv bytes {}", n); + // Ready(Ok(n)) + // } + // Ready(Err(e)) => Ready(Err(e.into())), + // Pending => Pending, + // } } // Try to consume bytes to parse a frame. @@ -157,9 +211,9 @@ where } } -impl Stream for InMemoryFrameAsyncReadStream +impl Stream for InMemoryFrameStream where - T: AsyncRead + Unpin, + T: Stream> + Unpin, { type Item = Result, Error>; diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index bbade7c..acfa000 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -1,5 +1,5 @@ use crate::collect::Collect; -use crate::tcprawclient::open_tcp_streams; +use crate::tcprawclient::open_event_data_streams; use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; @@ -35,7 +35,7 @@ pub async fn plain_events_json( let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; // TODO make sure the empty container arrives over the network. - let inps = open_tcp_streams::(subq, cluster).await?; + let inps = open_event_data_streams::(subq, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index 401d01b..c04effe 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -1,12 +1,11 @@ -/*! -Delivers event data. - -Delivers event data (not yet time-binned) from local storage and provides client functions -to request such data from nodes. -*/ +//! Delivers event data. +//! +//! Delivers event data (not yet time-binned) from local storage and provides client functions +//! to request such data from nodes. use crate::frames::eventsfromframes::EventsFromFrames; -use crate::frames::inmem::InMemoryFrameAsyncReadStream; +use crate::frames::inmem::InMemoryFrameStream; +use crate::frames::inmem::TcpReadAsBytes; use err::Error; use futures_util::Stream; use items_0::framable::FrameTypeInnerStatic; @@ -33,7 +32,7 @@ pub fn make_node_command_frame(query: EventsSubQuery) -> Result Result> + Send>>, Error> { @@ -49,17 +48,82 @@ pub async fn x_processed_event_blobs_stream_from_node( netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); - let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap()); + let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap()); let frames = Box::pin(frames); let items = EventsFromFrames::new(frames, addr); Ok(Box::pin(items)) } +pub async fn x_processed_event_blobs_stream_from_node_http( + subq: EventsSubQuery, + node: Node, +) -> Result> + Send>>, Error> { + use http::header; + use http::Method; + use http::Request; + use httpclient::http; + use httpclient::hyper; + use hyper::Body; + use hyper::StatusCode; + + let frame1 = make_node_command_frame(subq.clone())?; + let item = sitem_data(frame1.clone()); + let buf = item.make_frame()?; + + let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); + debug!("open_event_data_streams_http post {url}"); + let req = Request::builder() + .method(Method::POST) + .uri(url.to_string()) + .header(header::ACCEPT, "application/octet-stream") + .body(Body::from(buf.to_vec())) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let client = hyper::Client::new(); + let res = client + .request(req) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + let (head, body) = res.into_parts(); + let buf = hyper::body::to_bytes(body) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); + } + let (_head, body) = res.into_parts(); + let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap()); + let frames = Box::pin(frames); + let stream = EventsFromFrames::new(frames, url.to_string()); + debug!("open_event_data_streams_http done {url}"); + Ok(Box::pin(stream)) +} + +pub async fn x_processed_event_blobs_stream_from_node( + subq: EventsSubQuery, + node: Node, +) -> Result> + Send>>, Error> { + if true { + x_processed_event_blobs_stream_from_node_http(subq, node).await + } else { + x_processed_event_blobs_stream_from_node_tcp(subq, node).await + } +} + pub type BoxedStream = Pin> + Send>>; -pub async fn open_tcp_streams(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where - // Group bounds in new trait + // TODO group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { // TODO when unit tests established, change to async connect: @@ -78,10 +142,81 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap()); + let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, addr); streams.push(Box::pin(stream) as _); } Ok(streams) } + +async fn open_event_data_streams_http(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +where + // TODO group bounds in new trait + T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, +{ + let frame1 = make_node_command_frame(subq.clone())?; + let mut streams = Vec::new(); + for node in &cluster.nodes { + use http::header; + use http::Method; + use http::Request; + use httpclient::http; + use httpclient::hyper; + use hyper::Body; + use hyper::StatusCode; + + let item = sitem_data(frame1.clone()); + let buf = item.make_frame()?; + + let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); + debug!("open_event_data_streams_http post {url}"); + let req = Request::builder() + .method(Method::POST) + .uri(url.to_string()) + .header(header::ACCEPT, "application/octet-stream") + .body(Body::from(buf.to_vec())) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let client = hyper::Client::new(); + let res = client + .request(req) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + let (head, body) = res.into_parts(); + let buf = hyper::body::to_bytes(body) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); + } + let (_head, body) = res.into_parts(); + let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap()); + let frames = Box::pin(frames); + let stream = EventsFromFrames::::new(frames, url.to_string()); + debug!("open_event_data_streams_http done {url}"); + streams.push(Box::pin(stream) as _); + } + Ok(streams) +} + +pub async fn open_event_data_streams(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +where + // TODO group bounds in new trait + T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, +{ + if true { + open_event_data_streams_http(subq, cluster).await + } else { + open_event_data_streams_tcp(subq, cluster).await + } +} diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 82efba8..eed40c1 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -1,6 +1,6 @@ use crate::collect::Collect; use crate::rangefilter2::RangeFilter2; -use crate::tcprawclient::open_tcp_streams; +use crate::tcprawclient::open_event_data_streams; use crate::timebin::TimeBinnedStream; use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; @@ -48,7 +48,7 @@ async fn timebinnable_stream( let settings = EventsSubQuerySettings::from(&query); let subq = EventsSubQuery::from_parts(select, settings, reqid); let mut tr = build_merged_event_transform(subq.transform())?; - let inps = open_tcp_streams::(subq, &cluster).await?; + let inps = open_event_data_streams::(subq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, query.merger_out_len_max());