Report unknown channel count in status response

This commit is contained in:
Dominik Werder
2023-07-18 15:49:08 +02:00
parent 91947dec0f
commit 5d6358f12e
32 changed files with 596 additions and 518 deletions

View File

@@ -1,3 +1,4 @@
use crate::err::Error;
use crate::gather::gather_get_json_generic;
use crate::gather::SubRes;
use crate::response;
@@ -7,11 +8,8 @@ use bytes::BytesMut;
use disk::eventchunker::EventChunkerConf;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use disk::raw::conn::make_local_event_blobs_stream;
use err::Error;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::Method;
use http::StatusCode;
use hyper::Body;
@@ -28,6 +26,7 @@ use netpod::log::*;
use netpod::query::api1::Api1Query;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::Api1WarningStats;
use netpod::ByteSize;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
@@ -456,7 +455,7 @@ pub async fn gather_json_2_v1(
struct Jres {
hosts: Vec<Hres>,
}
let mut a = vec![];
let mut a = Vec::new();
for tr in spawned {
let res = match tr.1.await {
Ok(k) => match k {
@@ -501,8 +500,6 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
s1
)))
} else {
//use snafu::IntoError;
//Err(Bad{msg:format!("API error")}.into_error(NoneError)).ctxb(SE!(AddPos))
Ok(JsonValue::String(format!("status {}", pre.status.as_str())))
}
} else {
@@ -516,15 +513,6 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
}
}
async fn find_ch_conf(
range: NanoRange,
channel: SfDbChannel,
ncc: NodeConfigCached,
) -> Result<Option<ChannelTypeConfigGen>, Error> {
let ret = nodenet::channelconfig::channel_config(range, channel, &ncc).await?;
Ok(ret)
}
pub struct DataApiPython3DataStream {
range: NanoRange,
channels: VecDeque<ChannelTypeConfigGen>,
@@ -533,7 +521,6 @@ pub struct DataApiPython3DataStream {
current_fetch_info: Option<SfChFetchInfo>,
node_config: NodeConfigCached,
chan_stream: Option<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>>,
config_fut: Option<Pin<Box<dyn Future<Output = Result<Option<ChannelTypeConfigGen>, Error>> + Send>>>,
disk_io_tune: DiskIoTune,
do_decompress: bool,
event_count: usize,
@@ -543,6 +530,7 @@ pub struct DataApiPython3DataStream {
ping_last: Instant,
data_done: bool,
completed: bool,
stats: Api1WarningStats,
}
impl DataApiPython3DataStream {
@@ -564,7 +552,6 @@ impl DataApiPython3DataStream {
current_fetch_info: None,
node_config,
chan_stream: None,
config_fut: None,
disk_io_tune,
do_decompress,
event_count: 0,
@@ -574,9 +561,16 @@ impl DataApiPython3DataStream {
ping_last: Instant::now(),
data_done: false,
completed: false,
stats: Api1WarningStats::new(),
}
}
fn channel_finished(&mut self) {
self.chan_stream = None;
self.header_out = false;
self.event_count = 0;
}
fn convert_item(
b: EventFull,
channel: &ChannelTypeConfigGen,
@@ -588,7 +582,7 @@ impl DataApiPython3DataStream {
let shape = fetch_info.shape();
let mut d = BytesMut::new();
for i1 in 0..b.len() {
const EVIMAX: usize = 6;
const EVIMAX: usize = 20;
if *count_events < EVIMAX {
debug!(
"ev info {}/{} bloblen {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}",
@@ -662,57 +656,83 @@ impl DataApiPython3DataStream {
Ok(d)
}
fn handle_chan_stream_ready(&mut self, item: Sitemty<EventFull>) -> Option<Result<BytesMut, Error>> {
match item {
Ok(k) => {
let n = Instant::now();
if n.duration_since(self.ping_last) >= Duration::from_millis(2000) {
let mut sb = crate::status_board().unwrap();
sb.mark_alive(self.reqctx.reqid());
self.ping_last = n;
}
match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => todo!(),
RangeCompletableItem::Data(k) => {
let item = Self::convert_item(
k,
self.current_channel.as_ref().unwrap(),
self.current_fetch_info.as_ref().unwrap(),
self.do_decompress,
&mut self.header_out,
&mut self.event_count,
)?;
todo!()
fn handle_chan_stream_ready(&mut self, item: Sitemty<EventFull>) -> Result<BytesMut, Error> {
let ret = match item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => {
debug!("sees RangeComplete");
Ok(BytesMut::new())
}
RangeCompletableItem::Data(k) => {
self.event_count += k.len();
if self.events_max != 0 && self.event_count >= self.events_max as usize {
return Err(Error::with_msg_no_trace(format!(
"events_max reached {} {}",
self.event_count, self.events_max
)));
}
},
StreamItem::Log(k) => todo!(),
StreamItem::Stats(k) => todo!(),
// NOTE needed because the databuffer actually doesn't write
// the correct shape per event.
let mut k = k;
if let Some(fi) = self.current_fetch_info.as_ref() {
if let Shape::Scalar = fi.shape() {
} else {
k.overwrite_all_shapes(fi.shape());
}
}
let k = k;
let item = Self::convert_item(
k,
self.current_channel.as_ref().unwrap(),
self.current_fetch_info.as_ref().unwrap(),
self.do_decompress,
&mut self.header_out,
&mut self.event_count,
)?;
Ok(item)
}
},
StreamItem::Log(k) => {
let nodeix = k.node_ix;
if k.level == Level::ERROR {
tracing::event!(Level::ERROR, nodeix, message = k.msg);
} else if k.level == Level::WARN {
tracing::event!(Level::WARN, nodeix, message = k.msg);
} else if k.level == Level::INFO {
tracing::event!(Level::INFO, nodeix, message = k.msg);
} else if k.level == Level::DEBUG {
tracing::event!(Level::DEBUG, nodeix, message = k.msg);
} else if k.level == Level::TRACE {
tracing::event!(Level::TRACE, nodeix, message = k.msg);
} else {
tracing::event!(Level::TRACE, nodeix, message = k.msg);
}
Ok(BytesMut::new())
}
}
StreamItem::Stats(k) => {
//
Ok(BytesMut::new())
}
},
Err(e) => {
error!("DataApiPython3DataStream emit error: {e:?}");
self.chan_stream = None;
self.current_channel = None;
self.current_fetch_info = None;
self.data_done = true;
let mut sb = crate::status_board().unwrap();
sb.add_error(self.reqctx.reqid(), e);
if false {
// TODO format as python data api error frame:
let mut buf = BytesMut::with_capacity(1024);
buf.put_slice("".as_bytes());
Some(Ok(buf))
} else {
None
}
error!("DataApiPython3DataStream emit error: {e}");
Err(e.into())
}
};
let tsnow = Instant::now();
if tsnow.duration_since(self.ping_last) >= Duration::from_millis(500) {
self.ping_last = tsnow;
let mut sb = crate::status_board().unwrap();
sb.mark_alive(self.reqctx.reqid());
}
ret
}
// TODO this stream can currently only handle sf-databuffer type backend anyway.
fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> {
self.config_fut = None;
let select = EventsSubQuerySelect::new(
ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()),
self.range.clone().into(),
@@ -726,7 +746,7 @@ impl DataApiPython3DataStream {
debug!("TODO add timeout option to data api3 download");
// TODO is this a good to place decide this?
let stream = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
debug!("set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
let s = make_local_event_blobs_stream(
@@ -744,11 +764,6 @@ impl DataApiPython3DataStream {
let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone());
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(stream));
self.current_fetch_info = Some(fetch_info);
Ok(())
@@ -769,63 +784,52 @@ impl Stream for DataApiPython3DataStream {
} else {
if let Some(stream) = &mut self.chan_stream {
match stream.poll_next_unpin(cx) {
Ready(Some(k)) => Ready(self.handle_chan_stream_ready(k)),
Ready(None) => {
self.chan_stream = None;
continue;
}
Pending => Pending,
}
} else if let Some(fut) = &mut self.config_fut {
match fut.poll_unpin(cx) {
Ready(Ok(Some(k))) => match k {
ChannelTypeConfigGen::Scylla(_) => {
let e = Error::with_msg_no_trace("scylla");
Ready(Some(k)) => match self.handle_chan_stream_ready(k) {
Ok(k) => Ready(Some(Ok(k))),
Err(e) => {
error!("{e}");
self.chan_stream = None;
self.current_channel = None;
self.current_fetch_info = None;
self.data_done = true;
let mut sb = crate::status_board().unwrap();
sb.add_error(self.reqctx.reqid(), e.0.clone());
Ready(Some(Err(e)))
}
ChannelTypeConfigGen::SfDatabuffer(k) => match self.handle_config_fut_ready(k) {
Ok(()) => continue,
Err(e) => {
self.config_fut = None;
self.data_done = true;
error!("api1_binary_events error {:?}", e);
Ready(Some(Err(e)))
}
},
},
Ready(Ok(None)) => {
warn!("logic error");
self.config_fut = None;
Ready(None) => {
self.channel_finished();
continue;
}
Ready(Err(e)) => {
self.data_done = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else {
if let Some(channel) = self.channels.pop_front() {
self.current_channel = Some(channel.clone());
if false {
self.config_fut = Some(Box::pin(find_ch_conf(
self.range.clone(),
err::todoval(),
self.node_config.clone(),
)));
if let Some(chconf) = self.channels.pop_front() {
match &chconf {
ChannelTypeConfigGen::Scylla(_) => {
// TODO count
continue;
}
ChannelTypeConfigGen::SfDatabuffer(k) => match self.handle_config_fut_ready(k.clone()) {
Ok(()) => {
self.current_channel = Some(chconf.clone());
continue;
}
Err(e) => {
error!("api1_binary_events error {:?}", e);
self.stats.subreq_fail += 1;
continue;
}
},
}
self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(Some(channel)))));
continue;
} else {
self.data_done = true;
{
let n = Instant::now();
self.ping_last = n;
let mut sb = crate::status_board().unwrap();
sb.mark_alive(self.reqctx.reqid());
self.ping_last = n;
sb.mark_ok(self.reqctx.reqid());
sb.mark_done(self.reqctx.reqid());
}
continue;
}
@@ -958,7 +962,14 @@ impl Api1EventsBinaryHandler {
}
None => {
// TODO count in request ctx.
// TODO must already here have the final stats counter container.
// This means, the request status must provide these counters.
error!("no config quorum found for {ch:?}");
let mut sb = crate::status_board().unwrap();
sb.mark_alive(reqctx.reqid());
if let Some(e) = sb.get_entry(reqctx.reqid()) {
e.channel_not_found_inc();
}
}
}
}
@@ -1005,7 +1016,7 @@ impl RequestStatusHandler {
}
}
pub async fn handle(&self, req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, _ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, body) = req.into_parts();
if head.method != Method::GET {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
@@ -1024,8 +1035,9 @@ impl RequestStatusHandler {
}
let _body_data = hyper::body::to_bytes(body).await?;
let status_id = &head.uri.path()[Self::path_prefix().len()..];
info!("RequestStatusHandler status_id {:?}", status_id);
let s = crate::status_board()?.status_as_json(status_id);
debug!("RequestStatusHandler status_id {:?}", status_id);
let status = crate::status_board()?.status_as_json(status_id);
let s = serde_json::to_string(&status)?;
let ret = response(StatusCode::OK).body(Body::from(s))?;
Ok(ret)
}