From db6e55bcb67f4d7d4cf44cfb624ee8da05cbfbfd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 5 Nov 2024 16:15:32 +0100 Subject: [PATCH] WIP typechecks --- crates/daqbufp2/src/client.rs | 4 +- .../disk/src/merge/mergedblobsfromremotes.rs | 3 +- crates/err/Cargo.toml | 5 +- crates/err/src/lib.rs | 12 --- crates/httpclient/Cargo.toml | 4 + crates/httpclient/src/httpclient.rs | 85 ++++++++++--------- crates/streams/Cargo.toml | 5 +- crates/streams/src/tcprawclient.rs | 81 ++++++++++++++---- 8 files changed, 122 insertions(+), 77 deletions(-) diff --git a/crates/daqbufp2/src/client.rs b/crates/daqbufp2/src/client.rs index d1ebe80..58fa8e0 100644 --- a/crates/daqbufp2/src/client.rs +++ b/crates/daqbufp2/src/client.rs @@ -30,7 +30,7 @@ pub async fn status(host: String, port: u16) -> Result<(), Error> { .uri(uri) .body(body_empty())?; let mut client = httpclient::connect_client(req.uri()).await?; - let res = client.send_request(req).await?; + let res = client.send_request(req).await.map_err(Error::from_string)?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); return Err(Error::with_msg(format!("Server error {:?}", res))); @@ -81,7 +81,7 @@ pub async fn get_binned( .body(body_empty()) .ec()?; let mut client = httpclient::connect_client(req.uri()).await?; - let res = client.send_request(req).await?; + let res = client.send_request(req).await.map_err(Error::from_string)?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 2f939c1..b42577a 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -32,7 +32,8 @@ impl MergedBlobsFromRemotes { debug!("MergedBlobsFromRemotes::new subq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), ctx.clone()); + let post = todo!(); + let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), post, ctx.clone()); let f = f.map_err(sitem_err2_from_string); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); diff --git a/crates/err/Cargo.toml b/crates/err/Cargo.toml index 73eeeba..0c86c78 100644 --- a/crates/err/Cargo.toml +++ b/crates/err/Cargo.toml @@ -19,11 +19,10 @@ chrono = { version = "0.4.26", features = ["serde"] } url = "2.4.0" regex = "1.9.1" http = "1.0.0" -hyper = "1.0.1" +#hyper = "1.0.1" thiserror = "=0.0.1" -#thiserror = "1" anyhow = "1.0" -tokio = "1" +#tokio = "1" [patch.crates-io] thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index c140e57..2c1ff36 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -436,12 +436,6 @@ impl From for Error { } } -impl From for Error { - fn from(k: tokio::task::JoinError) -> Self { - Self::from_string(format!("{k}")) - } -} - impl From for Error { fn from(k: http::Error) -> Self { Self::from_string(k) @@ -454,12 +448,6 @@ impl From for Error { } } -impl From for Error { - fn from(k: hyper::Error) -> Self { - Self::from_string(k) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct PublicError { reason: Option, diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 58f046a..f1b8029 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -22,3 +22,7 @@ async-channel = "1.9.0" err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } +thiserror = "0.0.1" + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index efc9344..20dacfb 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -29,6 +29,48 @@ use std::task::Poll; use tokio::net::TcpStream; use url::Url; +#[derive(Debug)] +pub enum Error { + NoHostInUrl, + NoPortInUrl, + Connection, + IO(std::io::Error), + Http, + Body(Box), +} + +impl std::error::Error for Error {} + +impl From for Error { + fn from(value: std::io::Error) -> Self { + Self::IO(value) + } +} + +impl From for Error { + fn from(_: http::Error) -> Self { + Self::Http + } +} + +impl From for Error { + fn from(_: hyper::Error) -> Self { + Self::Http + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +impl err::ToErr for Error { + fn to_err(self) -> err::Error { + err::Error::with_msg_no_trace(format!("self")) + } +} + pub type BodyBox = BoxBody; pub type RespBox = Response; pub type Requ = Request; @@ -229,47 +271,6 @@ impl From for BodyError { } } -#[derive(Debug)] -pub enum Error { - NoHostInUrl, - NoPortInUrl, - Connection, - IO(std::io::Error), - Http, -} - -impl std::error::Error for Error {} - -impl From for Error { - fn from(value: std::io::Error) -> Self { - Self::IO(value) - } -} - -impl From for Error { - fn from(_: http::Error) -> Self { - Self::Http - } -} - -impl From for Error { - fn from(_: hyper::Error) -> Self { - Self::Http - } -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{self:?}") - } -} - -impl err::ToErr for Error { - fn to_err(self) -> err::Error { - err::Error::with_msg_no_trace(format!("self")) - } -} - pub struct HttpResponse { pub head: http::response::Parts, pub body: Bytes, @@ -404,7 +405,7 @@ impl futures_util::Stream for IncomingStream { Ready(Some(Ok(Bytes::new()))) } } - Err(e) => Ready(Some(Err(e.into()))), + Err(e) => Ready(Some(Err(err::Error::from_string(e)))), }, Ready(None) => Ready(None), Pending => Pending, diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 6fb57e0..cdfc8d3 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -28,7 +28,10 @@ query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } -httpclient = { path = "../httpclient" } +#httpclient = { path = "../httpclient" } +http = "1" +http-body = "1" +http-body-util = "0.1.0" [dev-dependencies] taskrun = { path = "../taskrun" } diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index a0396b2..c568d2f 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -7,12 +7,13 @@ use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::BoxedBytesStream; use crate::frames::inmem::InMemoryFrameStream; use crate::frames::inmem::TcpReadAsBytes; +use bytes::Bytes; +use bytes::BytesMut; use futures_util::Future; use futures_util::Stream; +use futures_util::StreamExt; use futures_util::TryStreamExt; use http::Uri; -use httpclient::body_bytes; -use httpclient::http; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; use items_0::streamitem::sitem_err2_from_string; @@ -52,10 +53,11 @@ pub enum Error { Framable(#[from] items_2::framable::Error), Json(#[from] serde_json::Error), Http(#[from] http::Error), - HttpClient(#[from] httpclient::Error), - Hyper(#[from] httpclient::hyper::Error), + // HttpClient(#[from] httpclient::Error), + // Hyper(#[from] httpclient::hyper::Error), #[error("ServerError({0:?}, {1})")] ServerError(http::response::Parts, String), + HttpBody(Box), } struct ErrMsg(E) @@ -119,42 +121,88 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( Ok(Box::pin(items)) } +#[derive(Debug, thiserror::Error)] +pub enum ErrorBody {} + +pub trait HttpSimplePost: Send { + fn http_simple_post( + &self, + // req: http::Request>>, + req: http::Request>, + ) -> http::Response< + http_body_util::StreamBody, ErrorBody>> + Send>>>, + >; +} + +pub async fn read_body_bytes(mut body: B) -> Result +where + B: http_body::Body + Unpin, + ::Error: std::error::Error + Send + 'static, +{ + use bytes::BufMut; + use http_body_util::BodyExt; + let mut buf = BytesMut::new(); + while let Some(x) = body.frame().await { + let mut frame = x.map_err(|e| Error::HttpBody(Box::new(e)))?; + if let Some(x) = frame.data_mut() { + buf.put(x); + } + } + Ok(buf.freeze()) +} + pub async fn x_processed_event_blobs_stream_from_node_http( subq: EventsSubQuery, node: Node, + post: Box, ctx: &ReqCtx, ) -> Result> + Send>>, Error> { use http::header; use http::Method; use http::Request; - use httpclient::hyper; - use hyper::StatusCode; - + use http::StatusCode; let frame1 = make_node_command_frame(subq.clone())?; let item = sitem_data(frame1.clone()); - let buf = item.make_frame_dyn()?; - + let buf = item.make_frame_dyn()?.freeze(); let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); let uri: Uri = url.as_str().parse().unwrap(); + let body = http_body_util::BodyDataStream::new(buf); let req = Request::builder() .method(Method::POST) .uri(&uri) .header(header::HOST, uri.host().unwrap()) .header(header::ACCEPT, APP_OCTET) .header(ctx.header_name(), ctx.header_value()) - .body(body_bytes(buf))?; - let mut client = httpclient::connect_client(req.uri()).await?; - let res = client.send_request(req).await?; + // .body(body_bytes(buf))?; + .body(body)?; + let res = post.http_simple_post(req); + // let mut client = httpclient::connect_client(req.uri()).await?; + // let res = client.send_request(req).await?; if res.status() != StatusCode::OK { - error!("Server error {:?}", res); let (head, body) = res.into_parts(); - let buf = httpclient::read_body_bytes(body).await?; + error!("server error {:?}", head); + let buf = read_body_bytes(body).await?; let s = String::from_utf8_lossy(&buf); return Err(Error::ServerError(head, s.to_string())); } let (_head, body) = res.into_parts(); - let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; + // while let Some(x) = body.next().await { + // let fr = x?; + // } + let inp = body; + let inp = inp.map(|x| match x { + Ok(x) => match x.into_data() { + Ok(x) => Ok(x), + Err(e) => { + debug!("see non-data frame {e:?}"); + Ok(Bytes::new()) + } + }, + Err(e) => Err(sitem_err2_from_string(e)), + }); + let inp = Box::pin(inp) as BoxedBytesStream; + // let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = frames.map_err(sitem_err2_from_string); let frames = Box::pin(frames); @@ -168,10 +216,11 @@ pub async fn x_processed_event_blobs_stream_from_node_http( pub async fn x_processed_event_blobs_stream_from_node( subq: EventsSubQuery, node: Node, + post: Box, ctx: ReqCtx, ) -> Result> + Send>>, Error> { if true { - x_processed_event_blobs_stream_from_node_http(subq, node, &ctx).await + x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await } else { x_processed_event_blobs_stream_from_node_tcp(subq, node).await }