WIP typechecks

This commit is contained in:
Dominik Werder
2024-11-05 16:15:32 +01:00
parent 9fd27332c3
commit db6e55bcb6
8 changed files with 122 additions and 77 deletions

View File

@@ -28,7 +28,10 @@ query = { path = "../query" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
httpclient = { path = "../httpclient" }
#httpclient = { path = "../httpclient" }
http = "1"
http-body = "1"
http-body-util = "0.1.0"
[dev-dependencies]
taskrun = { path = "../taskrun" }

View File

@@ -7,12 +7,13 @@ use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::BoxedBytesStream;
use crate::frames::inmem::InMemoryFrameStream;
use crate::frames::inmem::TcpReadAsBytes;
use bytes::Bytes;
use bytes::BytesMut;
use futures_util::Future;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::Uri;
use httpclient::body_bytes;
use httpclient::http;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
@@ -52,10 +53,11 @@ pub enum Error {
Framable(#[from] items_2::framable::Error),
Json(#[from] serde_json::Error),
Http(#[from] http::Error),
HttpClient(#[from] httpclient::Error),
Hyper(#[from] httpclient::hyper::Error),
// HttpClient(#[from] httpclient::Error),
// Hyper(#[from] httpclient::hyper::Error),
#[error("ServerError({0:?}, {1})")]
ServerError(http::response::Parts, String),
HttpBody(Box<dyn std::error::Error + Send>),
}
struct ErrMsg<E>(E)
@@ -119,42 +121,88 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
Ok(Box::pin(items))
}
#[derive(Debug, thiserror::Error)]
pub enum ErrorBody {}
pub trait HttpSimplePost: Send {
fn http_simple_post(
&self,
// req: http::Request<http_body_util::BodyDataStream<http_body::Frame<Bytes>>>,
req: http::Request<http_body_util::BodyDataStream<Bytes>>,
) -> http::Response<
http_body_util::StreamBody<Pin<Box<dyn Stream<Item = Result<http_body::Frame<Bytes>, ErrorBody>> + Send>>>,
>;
}
pub async fn read_body_bytes<B>(mut body: B) -> Result<Bytes, Error>
where
B: http_body::Body + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + 'static,
{
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
let mut frame = x.map_err(|e| Error::HttpBody(Box::new(e)))?;
if let Some(x) = frame.data_mut() {
buf.put(x);
}
}
Ok(buf.freeze())
}
pub async fn x_processed_event_blobs_stream_from_node_http(
subq: EventsSubQuery,
node: Node,
post: Box<dyn HttpSimplePost>,
ctx: &ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
use http::header;
use http::Method;
use http::Request;
use httpclient::hyper;
use hyper::StatusCode;
use http::StatusCode;
let frame1 = make_node_command_frame(subq.clone())?;
let item = sitem_data(frame1.clone());
let buf = item.make_frame_dyn()?;
let buf = item.make_frame_dyn()?.freeze();
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let body = http_body_util::BodyDataStream::new(buf);
let req = Request::builder()
.method(Method::POST)
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
.body(body_bytes(buf))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
// .body(body_bytes(buf))?;
.body(body)?;
let res = post.http_simple_post(req);
// let mut client = httpclient::connect_client(req.uri()).await?;
// let res = client.send_request(req).await?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = httpclient::read_body_bytes(body).await?;
error!("server error {:?}", head);
let buf = read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::ServerError(head, s.to_string()));
}
let (_head, body) = res.into_parts();
let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream;
// while let Some(x) = body.next().await {
// let fr = x?;
// }
let inp = body;
let inp = inp.map(|x| match x {
Ok(x) => match x.into_data() {
Ok(x) => Ok(x),
Err(e) => {
debug!("see non-data frame {e:?}");
Ok(Bytes::new())
}
},
Err(e) => Err(sitem_err2_from_string(e)),
});
let inp = Box::pin(inp) as BoxedBytesStream;
// let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);
@@ -168,10 +216,11 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
pub async fn x_processed_event_blobs_stream_from_node(
subq: EventsSubQuery,
node: Node,
post: Box<dyn HttpSimplePost>,
ctx: ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
if true {
x_processed_event_blobs_stream_from_node_http(subq, node, &ctx).await
x_processed_event_blobs_stream_from_node_http(subq, node, post, &ctx).await
} else {
x_processed_event_blobs_stream_from_node_tcp(subq, node).await
}