Add status board for post-fetch response status (for py data api 3)

This commit is contained in:
Dominik Werder
2022-03-02 09:55:11 +01:00
parent aa6db72e8e
commit 0b741d187e
7 changed files with 385 additions and 147 deletions

View File

@@ -9,8 +9,9 @@ use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use itertools::Itertools;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::timeunits::SEC;
use netpod::{log::*, ACCEPT_ALL};
use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use parse::channelconfig::{
@@ -21,7 +22,7 @@ use serde_json::Value as JsonValue;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};
use url::Url;
pub trait BackendAware {
@@ -500,11 +501,21 @@ pub struct Api1Range {
pub struct Api1Query {
channels: Vec<String>,
range: Api1Range,
// The following are unofficial and not-to-be-used parameters:
// All following parameters are private and not to be used
#[serde(rename = "fileIoBufferSize", default)]
file_io_buffer_size: Option<FileIoBufferSize>,
#[serde(default)]
decompress: bool,
#[serde(rename = "eventsMax", default = "u64_max", skip_serializing_if = "is_u64_max")]
events_max: u64,
}
fn u64_max() -> u64 {
u64::MAX
}
fn is_u64_max(x: &u64) -> bool {
*x == u64::MAX
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -527,6 +538,11 @@ pub struct DataApiPython3DataStream {
config_fut: Option<Pin<Box<dyn Future<Output = Result<Config, Error>> + Send>>>,
file_io_buffer_size: FileIoBufferSize,
do_decompress: bool,
#[allow(unused)]
event_count: u64,
events_max: u64,
status_id: String,
ping_last: Instant,
data_done: bool,
completed: bool,
}
@@ -537,6 +553,8 @@ impl DataApiPython3DataStream {
channels: Vec<Channel>,
file_io_buffer_size: FileIoBufferSize,
do_decompress: bool,
events_max: u64,
status_id: String,
node_config: NodeConfigCached,
) -> Self {
Self {
@@ -548,6 +566,10 @@ impl DataApiPython3DataStream {
config_fut: None,
file_io_buffer_size,
do_decompress,
event_count: 0,
events_max,
status_id,
ping_last: Instant::now(),
data_done: false,
completed: false,
}
@@ -562,9 +584,12 @@ impl DataApiPython3DataStream {
) -> Result<BytesMut, Error> {
let mut d = BytesMut::new();
for i1 in 0..b.tss.len() {
if *count_events < 6 {
const EVIMAX: usize = 6;
if *count_events < EVIMAX {
info!(
"deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}",
"ev info {}/{} decomps len {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}",
*count_events + 1,
EVIMAX,
b.decomps[i1].as_ref().map(|x| x.len()),
b.be[i1],
b.scalar_types[i1],
@@ -657,10 +682,26 @@ impl Stream for DataApiPython3DataStream {
match stream.poll_next_unpin(cx) {
Ready(k) => match k {
Some(k) => match k {
Ok(k) => Ready(Some(Ok(k))),
Ok(k) => {
let n = Instant::now();
if n.duration_since(self.ping_last) >= Duration::from_millis(2000) {
let mut sb = crate::status_board().unwrap();
sb.mark_alive(&self.status_id);
self.ping_last = n;
}
Ready(Some(Ok(k)))
}
Err(e) => {
error!("DataApiPython3DataStream emit error: {e:?}");
self.data_done = true;
Ready(Some(Err(e)))
let mut sb = crate::status_board().unwrap();
sb.add_error(&self.status_id, e);
// TODO format as python data api error frame:
//let mut buf = BytesMut::with_capacity(1024);
//buf.put_slice("".as_bytes());
//Ready(Some(Ok(buf)))
self.data_done = true;
Ready(None)
}
},
None => {
@@ -749,7 +790,12 @@ impl Stream for DataApiPython3DataStream {
}
});
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
self.chan_stream = Some(Box::pin(s.map_err(Error::from)));
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
continue;
}
Ready(Err(e)) => {
@@ -778,70 +824,6 @@ impl Stream for DataApiPython3DataStream {
}
}
pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
debug!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers());
let accept_def = "";
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def))
.to_owned();
let (_head, body) = req.into_parts();
let body_data = hyper::body::to_bytes(body).await?;
let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) {
qu
} else {
error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec()));
return Err(Error::with_msg_no_trace("can not parse query"));
};
debug!("got Api1Query: {:?}", qu);
let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date);
let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date);
let beg_date = beg_date?;
let end_date = end_date?;
debug!("beg_date {:?} end_date {:?}", beg_date, end_date);
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let query = PlainEventsBinaryQuery::from_url(&url)?;
// TODO add stricter check for types, check with client.
if accept == APP_OCTET {}
if false {
let e = Error::with_msg(format!("unexpected Accept: {:?}", accept));
error!("{:?}", e);
return Err(e);
}
let beg_ns = beg_date.timestamp() as u64 * 1000000000 + beg_date.timestamp_subsec_nanos() as u64;
let end_ns = end_date.timestamp() as u64 * 1000000000 + end_date.timestamp_subsec_nanos() as u64;
let range = NanoRange {
beg: beg_ns,
end: end_ns,
};
// TODO use the proper backend name:
let backend = "DUMMY";
let chans = qu
.channels
.iter()
.map(|x| Channel {
backend: backend.into(),
name: x.clone(),
})
.collect();
let file_io_buffer_size = if let Some(k) = qu.file_io_buffer_size {
k
} else {
node_config.node_config.cluster.file_io_buffer_size.clone()
};
let s = DataApiPython3DataStream::new(
range.clone(),
chans,
file_io_buffer_size,
qu.decompress,
node_config.clone(),
);
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy");
let ret = ret.body(BodyStream::wrapped(s, format!("api1_binary_events")))?;
return Ok(ret);
}
fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
match sh {
None => vec![],
@@ -856,3 +838,91 @@ fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
}
}
}
pub struct Api1EventsBinaryHandler {}
impl Api1EventsBinaryHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/1/query" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!(
"Api1EventsBinaryHandler::handle uri: {:?} headers: {:?}",
req.uri(),
req.headers()
);
if req.method() != Method::POST {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
}
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
let (_head, body) = req.into_parts();
let body_data = hyper::body::to_bytes(body).await?;
let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) {
qu
} else {
error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec()));
return Err(Error::with_msg_no_trace("can not parse query"));
};
info!(
"Api1Query {:?} {} {:?}",
qu.range,
qu.channels.len(),
qu.channels.first()
);
let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date);
let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date);
let beg_date = beg_date?;
let end_date = end_date?;
info!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date);
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let query = PlainEventsBinaryQuery::from_url(&url)?;
if accept != APP_OCTET && accept != ACCEPT_ALL {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
error!("{e:?}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64;
let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64;
let range = NanoRange { beg, end };
// TODO check for valid given backend name:
let backend = &node_config.node_config.cluster.backend;
let chans = qu
.channels
.iter()
.map(|x| Channel {
backend: backend.into(),
name: x.clone(),
})
.collect();
let file_io_buffer_size = if let Some(k) = qu.file_io_buffer_size {
k
} else {
node_config.node_config.cluster.file_io_buffer_size.clone()
};
// TODO use a better stream protocol with built-in error delivery.
let status_id = super::status_board()?.new_status_id();
let s = DataApiPython3DataStream::new(
range.clone(),
chans,
file_io_buffer_size,
qu.decompress,
qu.events_max,
status_id.clone(),
node_config.clone(),
);
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id);
let ret = ret.body(BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")))?;
Ok(ret)
}
}