From 0b741d187e3966f2c7cfd641dbd8767dd9264bf3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 2 Mar 2022 09:55:11 +0100 Subject: [PATCH] Add status board for post-fetch response status (for py data api 3) --- disk/src/eventblobs.rs | 44 +++++--- disk/src/eventchunker.rs | 58 ++++++----- disk/src/merge.rs | 9 +- dq/src/bin/dq.rs | 4 - err/src/lib.rs | 42 ++++---- httpret/src/api1.rs | 214 ++++++++++++++++++++++++++------------- httpret/src/httpret.rs | 161 +++++++++++++++++++++++++++-- 7 files changed, 385 insertions(+), 147 deletions(-) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index b41a5e1..fd39e93 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -10,8 +10,6 @@ use netpod::timeunits::SEC; use netpod::{log::*, FileIoBufferSize}; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; use std::task::{Context, Poll}; use streams::rangefilter::RangeFilter; @@ -29,11 +27,11 @@ pub struct EventChunkerMultifile { data_completed: bool, errored: bool, completed: bool, - max_ts: Arc, files_count: u32, node_ix: usize, expand: bool, do_decompress: bool, + max_ts: u64, } impl EventChunkerMultifile { @@ -62,11 +60,11 @@ impl EventChunkerMultifile { data_completed: false, errored: false, completed: false, - max_ts: Arc::new(AtomicU64::new(0)), files_count: 0, node_ix, expand, do_decompress, + max_ts: 0, } } } @@ -81,7 +79,7 @@ impl Stream for EventChunkerMultifile { use Poll::*; 'outer: loop { break if self.completed { - panic!("EventBlobsComplete poll_next on completed"); + panic!("EventChunkerMultifile poll_next on completed"); } else if self.errored { self.completed = true; return Ready(None); @@ -93,11 +91,35 @@ impl Stream for EventChunkerMultifile { Some(evs) => match evs.poll_next_unpin(cx) { Ready(Some(k)) => { if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k { - if false { - info!("EventChunkerMultifile emit {} events", h.tss.len()); - }; + if let Some(&g) = h.tss.last() { + if g == self.max_ts { + let msg = format!("EventChunkerMultifile repeated ts {}", g); + error!("{}", msg); + let e = Error::with_msg(msg); + self.errored = true; + Ready(Some(Err(e))) + } else if g < self.max_ts { + let msg = format!("EventChunkerMultifile unordered ts {}", g); + error!("{}", msg); + let e = Error::with_msg(msg); + self.errored = true; + Ready(Some(Err(e))) + } else { + self.max_ts = g; + if true { + info!("EventChunkerMultifile emit {} events", h.tss.len()); + } + Ready(Some(k)) + } + } else { + Ready(Some(k)) + } + } else if let Err(_) = &k { + self.errored = true; + Ready(Some(k)) + } else { + Ready(Some(k)) } - Ready(Some(k)) } Ready(None) => { self.evs = None; @@ -128,7 +150,6 @@ impl Stream for EventChunkerMultifile { self.range.clone(), self.event_chunker_conf.clone(), path, - self.max_ts.clone(), self.expand, self.do_decompress, ); @@ -161,7 +182,6 @@ impl Stream for EventChunkerMultifile { self.range.clone(), self.event_chunker_conf.clone(), of.path, - self.max_ts.clone(), self.expand, self.do_decompress, ); @@ -184,7 +204,7 @@ impl Stream for EventChunkerMultifile { let item = LogItem::quick( Level::INFO, format!( - "EventBlobsComplete used {} datafiles beg {} end {} node_ix {}", + "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", self.files_count, self.range.beg / SEC, self.range.end / SEC, diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index d7730cc..d4e2532 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -15,8 +15,6 @@ use parse::channelconfig::CompressionMethod; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::path::PathBuf; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; @@ -35,7 +33,7 @@ pub struct EventChunker { final_stats_sent: bool, parsed_bytes: u64, dbg_path: PathBuf, - max_ts: Arc, + max_ts: u64, expand: bool, do_decompress: bool, decomp_dt_histo: HistoLog2, @@ -43,6 +41,7 @@ pub struct EventChunker { seen_before_range_count: usize, seen_after_range_count: usize, unordered_warn_count: usize, + repeated_ts_warn_count: usize, } impl Drop for EventChunker { @@ -84,7 +83,6 @@ impl EventChunker { range: NanoRange, stats_conf: EventChunkerConf, dbg_path: PathBuf, - max_ts: Arc, expand: bool, do_decompress: bool, ) -> Self { @@ -106,7 +104,7 @@ impl EventChunker { final_stats_sent: false, parsed_bytes: 0, dbg_path, - max_ts, + max_ts: 0, expand, do_decompress, decomp_dt_histo: HistoLog2::new(8), @@ -114,6 +112,7 @@ impl EventChunker { seen_before_range_count: 0, seen_after_range_count: 0, unordered_warn_count: 0, + repeated_ts_warn_count: 0, } } @@ -124,20 +123,10 @@ impl EventChunker { range: NanoRange, stats_conf: EventChunkerConf, dbg_path: PathBuf, - max_ts: Arc, expand: bool, do_decompress: bool, ) -> Self { - let mut ret = Self::from_start( - inp, - channel_config, - range, - stats_conf, - dbg_path, - max_ts, - expand, - do_decompress, - ); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -205,24 +194,41 @@ impl EventChunker { let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; - let max_ts = self.max_ts.load(Ordering::SeqCst); - if ts < max_ts { + if ts == self.max_ts { + if self.repeated_ts_warn_count < 20 { + let msg = format!( + "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", + self.repeated_ts_warn_count, + ts / SEC, + ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + self.repeated_ts_warn_count += 1; + } + } + if ts < self.max_ts { if self.unordered_warn_count < 20 { let msg = format!( - "unordered event no {} ts: {}.{} max_ts {}.{} config {:?} path {:?}", + "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", self.unordered_warn_count, ts / SEC, ts % SEC, - max_ts / SEC, - max_ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, self.channel_config.shape, self.dbg_path ); warn!("{}", msg); self.unordered_warn_count += 1; + let e = Error::with_public_msg_no_trace(msg); + return Err(e); } } - self.max_ts.store(ts, Ordering::SeqCst); + self.max_ts = ts; if ts >= self.range.end { self.seen_after_range_count += 1; if !self.expand || self.seen_after_range_count >= 2 { @@ -234,8 +240,8 @@ impl EventChunker { if ts < self.range.beg { self.seen_before_range_count += 1; if self.seen_before_range_count > 1 { - let e = Error::with_msg(format!( - "seen before range: event ts: {}.{} range beg: {}.{} range end: {}.{} pulse {} config {:?} path {:?}", + let msg = format!( + "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", ts / SEC, ts % SEC, self.range.beg / SEC, @@ -245,7 +251,9 @@ impl EventChunker { pulse, self.channel_config.shape, self.dbg_path - )); + ); + warn!("{}", msg); + let e = Error::with_public_msg(msg); Err(e)?; } } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index abd308a..1f189f8 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -220,7 +220,7 @@ where if lowest_ts < self.ts_last_emit { self.errored = true; let msg = format!("unordered event at lowest_ts {}", lowest_ts); - return Ready(Some(Err(Error::with_msg(msg)))); + return Ready(Some(Err(Error::with_public_msg(msg)))); } else { self.ts_last_emit = self.ts_last_emit.max(lowest_ts); } @@ -296,12 +296,11 @@ mod test { use err::Error; use futures_util::StreamExt; use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; + use netpod::test_data_base_path_databuffer; use netpod::timeunits::{DAY, MS}; - use netpod::{log::*, test_data_base_path_databuffer}; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape}; use std::path::PathBuf; - use std::sync::atomic::AtomicU64; - use std::sync::Arc; fn scalar_file_path() -> PathBuf { test_data_base_path_databuffer() @@ -358,7 +357,6 @@ mod test { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::kb(1024), }; - let max_ts = Arc::new(AtomicU64::new(0)); let expand = false; let do_decompress = false; let dbg_path = PathBuf::from("/dbg/dummy"); @@ -372,7 +370,6 @@ mod test { range.clone(), stats_conf, dbg_path, - max_ts, expand, do_decompress, ); diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 7c84447..8867129 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -8,8 +8,6 @@ use netpod::log::*; use netpod::timeunits::*; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Shape}; use std::path::PathBuf; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -99,14 +97,12 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::mb(2), }; - let max_ts = Arc::new(AtomicU64::new(0)); let chunks = disk::eventchunker::EventChunker::from_start( inp, channel_config.clone(), range, stats_conf, sub.datafile.clone(), - max_ts.clone(), false, true, ); diff --git a/err/src/lib.rs b/err/src/lib.rs index fa6a6ff..6c8f1d9 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -23,39 +23,25 @@ The common error type for this application. #[derive(Serialize, Deserialize)] pub struct Error { msg: String, - #[serde(skip)] - trace: Option, trace_str: Option, public_msg: Option>, reason: Option, + parent: Option>, } impl Error { - pub fn with_msg>(s: S) -> Self { - Self { - msg: s.into(), - trace: None, - trace_str: Some(fmt_backtrace(&backtrace::Backtrace::new())), - public_msg: None, - reason: None, - } - } - pub fn with_msg_no_trace>(s: S) -> Self { Self { msg: s.into(), - trace: None, trace_str: None, public_msg: None, reason: None, + parent: None, } } - pub fn with_public_msg>(s: S) -> Self { - let s = s.into(); - let ret = Self::with_msg(&s); - let ret = ret.add_public_msg(s); - ret + pub fn with_msg>(s: S) -> Self { + Self::with_msg_no_trace(s).add_backtrace() } pub fn with_public_msg_no_trace>(s: S) -> Self { @@ -65,6 +51,14 @@ impl Error { ret } + pub fn with_public_msg>(s: S) -> Self { + let s = s.into(); + let ret = Self::with_msg_no_trace(String::new()); + let ret = ret.add_backtrace(); + let ret = ret.add_public_msg(s); + ret + } + pub fn from_string(e: E) -> Self where E: ToString, @@ -72,6 +66,12 @@ impl Error { Self::with_msg(e.to_string()) } + pub fn add_backtrace(self) -> Self { + let mut ret = self; + ret.trace_str = Some(fmt_backtrace(&backtrace::Backtrace::new())); + ret + } + pub fn mark_bad_request(mut self) -> Self { self.reason = Some(Reason::BadRequest); self @@ -155,9 +155,7 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { impl fmt::Debug for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let trace_str = if let Some(trace) = &self.trace { - fmt_backtrace(trace) - } else if let Some(s) = &self.trace_str { + let trace_str = if let Some(s) = &self.trace_str { s.into() } else { String::new() @@ -214,10 +212,10 @@ impl From for Error { fn from(k: PublicError) -> Self { Self { msg: String::new(), - trace: None, trace_str: None, public_msg: Some(vec![k.msg().into()]), reason: k.reason(), + parent: None, } } } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index f8d86c9..f40ba36 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -9,8 +9,9 @@ use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; -use netpod::log::*; use netpod::query::RawEventsQuery; +use netpod::timeunits::SEC; +use netpod::{log::*, ACCEPT_ALL}; use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use parse::channelconfig::{ @@ -21,7 +22,7 @@ use serde_json::Value as JsonValue; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use url::Url; pub trait BackendAware { @@ -500,11 +501,21 @@ pub struct Api1Range { pub struct Api1Query { channels: Vec, range: Api1Range, - // The following are unofficial and not-to-be-used parameters: + // All following parameters are private and not to be used #[serde(rename = "fileIoBufferSize", default)] file_io_buffer_size: Option, #[serde(default)] decompress: bool, + #[serde(rename = "eventsMax", default = "u64_max", skip_serializing_if = "is_u64_max")] + events_max: u64, +} + +fn u64_max() -> u64 { + u64::MAX +} + +fn is_u64_max(x: &u64) -> bool { + *x == u64::MAX } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -527,6 +538,11 @@ pub struct DataApiPython3DataStream { config_fut: Option> + Send>>>, file_io_buffer_size: FileIoBufferSize, do_decompress: bool, + #[allow(unused)] + event_count: u64, + events_max: u64, + status_id: String, + ping_last: Instant, data_done: bool, completed: bool, } @@ -537,6 +553,8 @@ impl DataApiPython3DataStream { channels: Vec, file_io_buffer_size: FileIoBufferSize, do_decompress: bool, + events_max: u64, + status_id: String, node_config: NodeConfigCached, ) -> Self { Self { @@ -548,6 +566,10 @@ impl DataApiPython3DataStream { config_fut: None, file_io_buffer_size, do_decompress, + event_count: 0, + events_max, + status_id, + ping_last: Instant::now(), data_done: false, completed: false, } @@ -562,9 +584,12 @@ impl DataApiPython3DataStream { ) -> Result { let mut d = BytesMut::new(); for i1 in 0..b.tss.len() { - if *count_events < 6 { + const EVIMAX: usize = 6; + if *count_events < EVIMAX { info!( - "deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}", + "ev info {}/{} decomps len {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}", + *count_events + 1, + EVIMAX, b.decomps[i1].as_ref().map(|x| x.len()), b.be[i1], b.scalar_types[i1], @@ -657,10 +682,26 @@ impl Stream for DataApiPython3DataStream { match stream.poll_next_unpin(cx) { Ready(k) => match k { Some(k) => match k { - Ok(k) => Ready(Some(Ok(k))), + Ok(k) => { + let n = Instant::now(); + if n.duration_since(self.ping_last) >= Duration::from_millis(2000) { + let mut sb = crate::status_board().unwrap(); + sb.mark_alive(&self.status_id); + self.ping_last = n; + } + Ready(Some(Ok(k))) + } Err(e) => { + error!("DataApiPython3DataStream emit error: {e:?}"); self.data_done = true; - Ready(Some(Err(e))) + let mut sb = crate::status_board().unwrap(); + sb.add_error(&self.status_id, e); + // TODO format as python data api error frame: + //let mut buf = BytesMut::with_capacity(1024); + //buf.put_slice("".as_bytes()); + //Ready(Some(Ok(buf))) + self.data_done = true; + Ready(None) } }, None => { @@ -749,7 +790,12 @@ impl Stream for DataApiPython3DataStream { } }); //let _ = Box::new(s) as Box> + Unpin>; - self.chan_stream = Some(Box::pin(s.map_err(Error::from))); + let evm = if self.events_max == 0 { + usize::MAX + } else { + self.events_max as usize + }; + self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); continue; } Ready(Err(e)) => { @@ -778,70 +824,6 @@ impl Stream for DataApiPython3DataStream { } } -pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers()); - let accept_def = ""; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)) - .to_owned(); - let (_head, body) = req.into_parts(); - let body_data = hyper::body::to_bytes(body).await?; - let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) { - qu - } else { - error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); - return Err(Error::with_msg_no_trace("can not parse query")); - }; - debug!("got Api1Query: {:?}", qu); - let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); - let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); - let beg_date = beg_date?; - let end_date = end_date?; - debug!("beg_date {:?} end_date {:?}", beg_date, end_date); - //let url = Url::parse(&format!("dummy:{}", req.uri()))?; - //let query = PlainEventsBinaryQuery::from_url(&url)?; - // TODO add stricter check for types, check with client. - if accept == APP_OCTET {} - if false { - let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); - error!("{:?}", e); - return Err(e); - } - let beg_ns = beg_date.timestamp() as u64 * 1000000000 + beg_date.timestamp_subsec_nanos() as u64; - let end_ns = end_date.timestamp() as u64 * 1000000000 + end_date.timestamp_subsec_nanos() as u64; - let range = NanoRange { - beg: beg_ns, - end: end_ns, - }; - // TODO use the proper backend name: - let backend = "DUMMY"; - let chans = qu - .channels - .iter() - .map(|x| Channel { - backend: backend.into(), - name: x.clone(), - }) - .collect(); - let file_io_buffer_size = if let Some(k) = qu.file_io_buffer_size { - k - } else { - node_config.node_config.cluster.file_io_buffer_size.clone() - }; - let s = DataApiPython3DataStream::new( - range.clone(), - chans, - file_io_buffer_size, - qu.decompress, - node_config.clone(), - ); - let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy"); - let ret = ret.body(BodyStream::wrapped(s, format!("api1_binary_events")))?; - return Ok(ret); -} - fn shape_to_api3proto(sh: &Option>) -> Vec { match sh { None => vec![], @@ -856,3 +838,91 @@ fn shape_to_api3proto(sh: &Option>) -> Vec { } } } + +pub struct Api1EventsBinaryHandler {} + +impl Api1EventsBinaryHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/1/query" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!( + "Api1EventsBinaryHandler::handle uri: {:?} headers: {:?}", + req.uri(), + req.headers() + ); + if req.method() != Method::POST { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); + } + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? + .to_owned(); + let (_head, body) = req.into_parts(); + let body_data = hyper::body::to_bytes(body).await?; + let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) { + qu + } else { + error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec())); + return Err(Error::with_msg_no_trace("can not parse query")); + }; + info!( + "Api1Query {:?} {} {:?}", + qu.range, + qu.channels.len(), + qu.channels.first() + ); + let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); + let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); + let beg_date = beg_date?; + let end_date = end_date?; + info!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date); + //let url = Url::parse(&format!("dummy:{}", req.uri()))?; + //let query = PlainEventsBinaryQuery::from_url(&url)?; + if accept != APP_OCTET && accept != ACCEPT_ALL { + // TODO set the public error code and message and return Err(e). + let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + error!("{e:?}"); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64; + let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64; + let range = NanoRange { beg, end }; + // TODO check for valid given backend name: + let backend = &node_config.node_config.cluster.backend; + let chans = qu + .channels + .iter() + .map(|x| Channel { + backend: backend.into(), + name: x.clone(), + }) + .collect(); + let file_io_buffer_size = if let Some(k) = qu.file_io_buffer_size { + k + } else { + node_config.node_config.cluster.file_io_buffer_size.clone() + }; + // TODO use a better stream protocol with built-in error delivery. + let status_id = super::status_board()?.new_status_id(); + let s = DataApiPython3DataStream::new( + range.clone(), + chans, + file_io_buffer_size, + qu.decompress, + qu.events_max, + status_id.clone(), + node_config.clone(), + ); + let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id); + let ret = ret.body(BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")))?; + Ok(ret) + } +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 0b54a7f..88cb295 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -31,6 +31,11 @@ use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; +use serde::Serialize; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::{Once, RwLock, RwLockWriteGuard}; +use std::time::SystemTime; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; @@ -42,6 +47,13 @@ fn proxy_mark() -> &'static str { } pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { + static STATUS_BOARD_INIT: Once = Once::new(); + STATUS_BOARD_INIT.call_once(|| { + let b = StatusBoard::new(); + let a = RwLock::new(b); + let x = Box::new(a); + STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); + }); let _update_task = if node_config.node_config.cluster.run_map_pulse_task { Some(UpdateTask::new(node_config.clone())) } else { @@ -287,12 +299,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/1/query" { - if req.method() == Method::POST { - Ok(api1::api1_binary_events(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } + } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { @@ -479,6 +487,26 @@ impl ToPublicResponse for ::err::Error { } } +pub struct StatusBoardAllHandler {} + +impl StatusBoardAllHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/status/board/all" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, _req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + use std::ops::Deref; + let sb = status_board().unwrap(); + let buf = serde_json::to_vec(sb.deref()).unwrap(); + let res = response(StatusCode::OK).body(Body::from(buf))?; + Ok(res) + } +} + async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { match binned_inner(req, node_config).await { Ok(ret) => Ok(ret), @@ -731,6 +759,127 @@ pub async fn update_search_cache(req: Request, node_config: &NodeConfigCac Ok(ret) } +#[derive(Serialize)] +pub struct StatusBoardEntry { + #[allow(unused)] + #[serde(serialize_with = "instant_serde::ser")] + ts_created: SystemTime, + #[serde(serialize_with = "instant_serde::ser")] + ts_updated: SystemTime, + is_error: bool, + is_ok: bool, + errors: Vec, +} + +mod instant_serde { + use super::*; + use serde::Serializer; + pub fn ser(x: &SystemTime, ser: S) -> Result { + let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap(); + let dt = chrono::TimeZone::timestamp(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos()); + let s = dt.format("%Y-%m-%d %H:%M:%S").to_string(); + ser.serialize_str(&s) + } +} + +impl StatusBoardEntry { + pub fn new() -> Self { + Self { + ts_created: SystemTime::now(), + ts_updated: SystemTime::now(), + is_error: false, + is_ok: false, + errors: vec![], + } + } +} + +#[derive(Serialize)] +pub struct StatusBoard { + entries: BTreeMap, +} + +impl StatusBoard { + pub fn new() -> Self { + Self { + entries: BTreeMap::new(), + } + } + + pub fn new_status_id(&mut self) -> String { + use std::fs::File; + use std::io::Read; + self.clean(); + let mut f = File::open("/dev/urandom").unwrap(); + let mut buf = [0; 8]; + f.read_exact(&mut buf).unwrap(); + let n = u64::from_le_bytes(buf); + let s = format!("{:016x}", n); + self.entries.insert(s.clone(), StatusBoardEntry::new()); + s + } + + pub fn clean(&mut self) { + if self.entries.len() > 15000 { + let mut tss: Vec<_> = self.entries.values().map(|e| e.ts_updated).collect(); + tss.sort_unstable(); + let tss = tss; + let tsm = tss[tss.len() / 3]; + let a = std::mem::replace(&mut self.entries, BTreeMap::new()); + self.entries = a.into_iter().filter(|(_k, v)| v.ts_updated >= tsm).collect(); + } + } + + pub fn mark_alive(&mut self, status_id: &str) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn mark_ok(&mut self, status_id: &str) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + if !e.is_error { + e.is_ok = true; + } + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn add_error(&mut self, status_id: &str, error: Error) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + e.is_error = true; + e.is_ok = false; + e.errors.push(error); + } + None => { + error!("can not find status id {}", status_id); + } + } + } +} + +static STATUS_BOARD: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut()); + +pub fn status_board() -> Result, Error> { + let x = unsafe { &*STATUS_BOARD.load(Ordering::SeqCst) }.write(); + match x { + Ok(x) => Ok(x), + Err(e) => Err(Error::with_msg(format!("{e:?}"))), + } +} + pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url);