This commit is contained in:
Dominik Werder
2024-11-26 16:29:16 +01:00
parent f168280d18
commit 927ef9ca55
8 changed files with 97 additions and 175 deletions

View File

@@ -1,4 +1,3 @@
use crate::api4::events::bytes_chunks_to_len_framed_str;
use crate::bodystream::response;
use crate::channelconfig::ch_conf_from_binned;
use crate::requests::accepts_json_framed;
@@ -40,6 +39,7 @@ use std::sync::Arc;
use streams::collect::CollectResult;
use streams::eventsplainreader::DummyCacheReadProvider;
use streams::eventsplainreader::SfDatabufferEventReadProvider;
use streams::lenframe::bytes_chunks_to_len_framed_str;
use streams::timebin::cached::reader::EventsReadProvider;
use streams::timebin::CacheReadProvider;
use tracing::Instrument;

View File

@@ -4,16 +4,10 @@ use crate::requests::accepts_json_framed;
use crate::requests::accepts_json_or_all;
use crate::response;
use crate::ServiceSharedResources;
use bytes::Bytes;
use bytes::BytesMut;
use daqbuf_err as err;
use dbconn::worker::PgQueue;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
use http::header::CONTENT_TYPE;
use http::Method;
use http::StatusCode;
@@ -41,6 +35,10 @@ use query::api4::events::PlainEventsQuery;
use std::sync::Arc;
use streams::collect::CollectResult;
use streams::instrument::InstrumentStream;
use streams::lenframe::bytes_chunks_to_framed;
use streams::lenframe::bytes_chunks_to_len_framed_str;
use streams::plaineventscbor::plain_events_cbor_stream;
use streams::plaineventsjson::plain_events_json_stream;
use tracing::Instrument;
#[derive(Debug, ThisError)]
@@ -179,8 +177,7 @@ async fn plain_events_cbor_framed(
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Arc::pin(open_bytes);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream =
streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?;
let stream = plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?;
let stream = bytes_chunks_to_framed(stream);
let logspan = if evq.log_level() == "trace" {
trace!("enable trace for handler");
@@ -210,8 +207,7 @@ async fn plain_events_json_framed(
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Arc::pin(open_bytes);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream =
streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?;
let stream = plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?;
let stream = bytes_chunks_to_len_framed_str(stream);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)
@@ -271,58 +267,3 @@ async fn plain_events_json(
}
}
}
fn bytes_chunks_to_framed<S, T, E>(stream: S) -> impl Stream<Item = Result<Bytes, E>>
where
S: Stream<Item = Result<T, E>>,
T: Into<Bytes>,
E: std::error::Error,
{
use future::ready;
stream
// TODO unify this map to padded bytes for both json and cbor output
.flat_map(|x| match x {
Ok(y) => {
use bytes::BufMut;
let buf = y.into();
let adv = (buf.len() + 7) / 8 * 8;
let pad = adv - buf.len();
let mut b2 = BytesMut::with_capacity(16);
b2.put_u32_le(buf.len() as u32);
b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let mut b3 = BytesMut::with_capacity(16);
b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]);
stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())])
}
Err(e) => {
error!("{e}");
stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())])
}
})
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) })
}
// TODO move this, it's also used by binned.
pub fn bytes_chunks_to_len_framed_str<S, T, E>(stream: S) -> impl Stream<Item = Result<String, E>>
where
S: Stream<Item = Result<T, E>>,
T: Into<String>,
E: std::error::Error,
{
use future::ready;
stream
.flat_map(|x| match x {
Ok(y) => {
use std::fmt::Write;
let s = y.into();
let mut b2 = String::with_capacity(16);
write!(b2, "{:15}\n", s.len()).unwrap();
stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))])
}
Err(e) => {
error!("{e}");
stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())])
}
})
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) })
}