Extend binned line to requested end

This commit is contained in:
Dominik Werder
2025-02-12 09:30:40 +01:00
parent d529d9dbc7
commit a967fbd08a
28 changed files with 548 additions and 292 deletions

View File

@@ -7,9 +7,8 @@ use crate::requests::accepts_octets;
use crate::ServiceSharedResources;
use daqbuf_err as err;
use dbconn::worker::PgQueue;
use err::thiserror;
use err::ThisError;
use http::header::CONTENT_TYPE;
use http::request::Parts;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
@@ -24,6 +23,7 @@ use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::req_uri_to_url;
use netpod::timeunits::SEC;
use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
@@ -40,25 +40,27 @@ use std::sync::Arc;
use streams::collect::CollectResult;
use streams::eventsplainreader::DummyCacheReadProvider;
use streams::eventsplainreader::SfDatabufferEventReadProvider;
use streams::streamtimeout::StreamTimeout2;
use streams::timebin::cached::reader::EventsReadProvider;
use streams::timebin::CacheReadProvider;
use tracing::Instrument;
use url::Url;
use tracing::Span;
#[derive(Debug, ThisError)]
#[cstm(name = "Api4Binned")]
pub enum Error {
ChannelNotFound,
BadQuery(String),
HttpLib(#[from] http::Error),
ChannelConfig(crate::channelconfig::Error),
Retrieval(#[from] crate::RetrievalError),
EventsCbor(#[from] streams::plaineventscbor::Error),
EventsJson(#[from] streams::plaineventsjson::Error),
ServerError,
BinnedStream(err::Error),
TimebinnedJson(#[from] streams::timebinnedjson::Error),
}
autoerr::create_error_v1!(
name(Error, "Api4Binned"),
enum variants {
ChannelNotFound,
BadQuery(String),
HttpLib(#[from] http::Error),
ChannelConfig(crate::channelconfig::Error),
Retrieval(#[from] crate::RetrievalError),
EventsCbor(#[from] streams::plaineventscbor::Error),
EventsJson(#[from] streams::plaineventsjson::Error),
ServerError,
BinnedStream(err::Error),
TimebinnedJson(#[from] streams::timebinnedjson::Error),
},
);
impl From<crate::channelconfig::Error> for Error {
fn from(value: crate::channelconfig::Error) -> Self {
@@ -104,7 +106,7 @@ impl BinnedHandler {
}
_ => {
error!("EventsHandler sees: {e}");
Ok(error_response(e.public_message(), ctx.reqid()))
Ok(error_response(e.to_string(), ctx.reqid()))
}
},
}
@@ -126,19 +128,61 @@ async fn binned(
{
Err(Error::ServerError)?;
}
if accepts_cbor_framed(req.headers()) {
Ok(binned_cbor_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?)
} else if accepts_json_framed(req.headers()) {
Ok(binned_json_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?)
} else if accepts_json_or_all(req.headers()) {
Ok(binned_json_single(url, req, ctx, pgqueue, scyqueue, ncc).await?)
} else if accepts_octets(req.headers()) {
let reqid = ctx.reqid();
let (head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_cbor_framed: {e:?}");
Error::BadQuery(e.to_string())
})?;
let logspan = if query.log_level() == "trace" {
trace!("enable trace for handler");
tracing::span!(tracing::Level::INFO, "log_span_trace")
} else if query.log_level() == "debug" {
debug!("enable debug for handler");
tracing::span!(tracing::Level::INFO, "log_span_debug")
} else {
tracing::Span::none()
};
let span1 = span!(
Level::INFO,
"httpret::binned_cbor_framed",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("binned begin {:?}", query);
});
binned_instrumented(head, ctx, query, pgqueue, scyqueue, ncc, logspan.clone())
.instrument(logspan)
.instrument(span1)
.await
}
async fn binned_instrumented(
head: Parts,
ctx: &ReqCtx,
query: BinnedQuery,
pgqueue: &PgQueue,
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
logspan: Span,
) -> Result<StreamResponse, Error> {
let res2 = HandleRes2::new(ctx, logspan, query.clone(), pgqueue, scyqueue, ncc).await?;
if accepts_cbor_framed(&head.headers) {
Ok(binned_cbor_framed(res2, ctx, ncc).await?)
} else if accepts_json_framed(&head.headers) {
Ok(binned_json_framed(res2, ctx, ncc).await?)
} else if accepts_json_or_all(&head.headers) {
Ok(binned_json_single(res2, ctx, ncc).await?)
} else if accepts_octets(&head.headers) {
Ok(error_response(
format!("binary binned data not yet available"),
ctx.reqid(),
))
} else {
let ret = error_response(format!("Unsupported Accept: {:?}", req.headers()), ctx.reqid());
let ret = error_response(format!("Unsupported Accept: {:?}", &head.headers), ctx.reqid());
Ok(ret)
}
}
@@ -182,48 +226,20 @@ fn make_read_provider(
}
async fn binned_json_single(
url: Url,
req: Requ,
res2: HandleRes2<'_>,
ctx: &ReqCtx,
pgqueue: &PgQueue,
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
_ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
// TODO unify with binned_json_framed
debug!("binned_json_single {:?}", req);
let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_json: {e:?}");
Error::BadQuery(e.to_string())
})?;
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let span1 = span!(
Level::INFO,
"httpret::binned",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
debug!("binned_json_single");
let item = streams::timebinnedjson::timebinned_json(
query,
ch_conf,
res2.query,
res2.ch_conf,
ctx,
cache_read_provider,
events_read_provider,
timeout_provider,
res2.cache_read_provider,
res2.events_read_provider,
res2.timeout_provider,
)
.instrument(span1)
.await?;
match item {
CollectResult::Some(item) => {
@@ -245,50 +261,30 @@ async fn binned_json_single(
}
async fn binned_json_framed(
url: Url,
req: Requ,
res2: HandleRes2<'_>,
ctx: &ReqCtx,
pgqueue: &PgQueue,
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("binned_json_framed {:?}", req);
let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_json_framed: {e:?}");
Error::BadQuery(e.to_string())
})?;
debug!("binned_json_framed");
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let span1 = span!(
Level::INFO,
"httpret::binned",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream = streams::timebinnedjson::timebinned_json_framed(
query,
res2.query,
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
timeout_provider,
)
.instrument(span1)
.await?;
let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream);
let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
@@ -297,53 +293,74 @@ async fn binned_json_framed(
}
async fn binned_cbor_framed(
url: Url,
req: Requ,
res2: HandleRes2<'_>,
ctx: &ReqCtx,
pgqueue: &PgQueue,
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("binned_cbor_framed {:?}", req);
let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_cbor_framed: {e:?}");
Error::BadQuery(e.to_string())
})?;
debug!("binned_cbor_framed");
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
let ch_conf = ch_conf_from_binned(&res2.query, ctx, res2.pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let span1 = span!(
Level::INFO,
"httpret::binned_cbor_framed",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
make_read_provider(ch_conf.name(), res2.scyqueue, open_bytes, ctx, ncc);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream = streams::timebinnedjson::timebinned_cbor_framed(
query,
res2.query,
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
timeout_provider,
)
.instrument(span1)
.await?;
let stream = streams::lenframe::bytes_chunks_to_framed(stream);
let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_CBOR_FRAMED)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(body_stream(stream))?;
Ok(ret)
}
struct HandleRes2<'a> {
logspan: Span,
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
events_read_provider: Arc<dyn EventsReadProvider>,
cache_read_provider: Arc<dyn CacheReadProvider>,
timeout_provider: Box<dyn StreamTimeout2>,
pgqueue: &'a PgQueue,
scyqueue: Option<ScyllaQueue>,
}
impl<'a> HandleRes2<'a> {
async fn new(
ctx: &ReqCtx,
logspan: Span,
query: BinnedQuery,
pgqueue: &'a PgQueue,
scyqueue: Option<ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Self, Error> {
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc);
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let ret = Self {
logspan,
query,
ch_conf,
events_read_provider,
cache_read_provider,
timeout_provider,
pgqueue,
scyqueue,
};
Ok(ret)
}
}