From 11d35e0cb602c8bbd385d37287ea78f06dbb9cf6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 11 Dec 2023 15:55:18 +0100 Subject: [PATCH] 0.5.0-alpha.0 --- crates/httpret/src/api1.rs | 2 +- crates/httpret/src/api4/maintenance.rs | 14 ++++++ crates/httpret/src/channel_status.rs | 1 - crates/httpret/src/httpret.rs | 14 +++--- crates/httpret/src/proxy.rs | 1 - crates/httpret/src/pulsemap.rs | 62 ++++---------------------- crates/streams/src/lib.rs | 1 + crates/streams/src/print_on_done.rs | 41 +++++++++++++++++ 8 files changed, 72 insertions(+), 64 deletions(-) create mode 100644 crates/streams/src/print_on_done.rs diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 3b6aa97..4c21ebf 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -795,7 +795,7 @@ impl Stream for DataApiPython3DataStream { self.completed = true; let dt = self.ts_ctor.elapsed().as_secs_f32(); info!( - "response body sent {} bytes {} items {:0} ms", + "response body sent {} bytes {} items {:.0} ms", self.count_bytes, self.count_emits, 1e3 * dt diff --git a/crates/httpret/src/api4/maintenance.rs b/crates/httpret/src/api4/maintenance.rs index 6c86daf..058640d 100644 --- a/crates/httpret/src/api4/maintenance.rs +++ b/crates/httpret/src/api4/maintenance.rs @@ -62,6 +62,13 @@ impl UpdateDbWithChannelNamesHandler { } Err(e) => Err(e), }); + let stream = streams::print_on_done::PrintOnDone::new( + stream, + Box::pin(|ts| { + let dt = ts.elapsed(); + info!("{} stream done {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32()); + }), + ); let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(body_stream(stream))?; @@ -122,6 +129,13 @@ impl UpdateDbWithAllChannelConfigsHandler { } Err(e) => Err(e), }); + let stream = streams::print_on_done::PrintOnDone::new( + stream, + Box::pin(|ts| { + let dt = ts.elapsed(); + info!("{} stream done {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32()); + }), + ); let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(body_stream(stream))?; diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 26f4f31..18f6993 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -21,7 +21,6 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; use netpod::APP_JSON; -use url::Url; pub struct ConnectionStatusEvents {} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 8be6793..d2b5ed6 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -377,6 +377,12 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) + } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) + } else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) + } else if let Some(h) = api4::events::EventsHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) { @@ -395,12 +401,6 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) - } else if let Some(h) = api4::events::EventsHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) - } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) - } else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) @@ -433,8 +433,6 @@ async fn http_service_inner( Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) - } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { - Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 93f01d7..1550f60 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -12,7 +12,6 @@ use crate::gather::SubRes; use crate::pulsemap::MapPulseQuery; use crate::response; use crate::response_err; -use crate::status_board; use crate::status_board_init; use crate::Cont; use futures_util::pin_mut; diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 892121d..7ab89eb 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -91,7 +91,7 @@ async fn make_tables(pgc: &dbconn::pg::Client) -> Result<(), Error> { Ok(()) } -fn timer_channel_names() -> Vec { +fn _timer_channel_names() -> Vec { let sections = vec!["SINEG01", "SINSB01", "SINSB02", "SINSB03", "SINSB04", "SINXB01"]; let suffixes = vec!["MASTER"]; let mut all: Vec<_> = sections @@ -388,57 +388,6 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option) -> Resu Ok((ret, file)) } -pub struct IndexFullHttpFunction {} - -impl IndexFullHttpFunction { - pub fn handler(req: &Requ) -> Option { - if req.uri().path().eq("/api/1/map/index/full") { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { - if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); - } - let ret = match Self::index(false, node_config).await { - Ok(msg) => response(StatusCode::OK).body(body_string(msg))?, - Err(e) => { - error!("IndexFullHttpFunction {e}"); - response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(format!("{:?}", e)))? - } - }; - Ok(ret) - } - - async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result { - // TODO avoid double-insert on central storage. - let mut msg = format!("LOG"); - let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; - // TODO remove update of static columns when older clients are removed. - let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; - let insert_01 = pgc.prepare(sql).await?; - make_tables(&pgc).await?; - let chs = timer_channel_names(); - for channel_name in chs { - match IndexChannelHttpFunction::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await - { - Ok(m) => { - msg.push_str("\n"); - msg.push_str(&m); - } - Err(e) => { - error!("error while indexing {} {:?}", channel_name, e); - //return Err(e); - } - } - } - Ok(msg) - } -} - pub struct IndexChannelHttpFunction {} impl IndexChannelHttpFunction { @@ -688,13 +637,20 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) false }; CACHE.housekeeping(); - match IndexFullHttpFunction::index(do_print, &node_config).await { + + // TODO do the actual work here + let _ = node_config; + match tokio::time::sleep(Duration::from_millis(100)) + .map(|_| Ok::<_, Error>(123u32)) + .await + { Ok(_) => {} Err(e) => { error!("issue during last update task: {:?}", e); tokio::time::sleep(Duration::from_millis(20000)).await; } } + let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); if do_print || dt >= Duration::from_millis(4000) { diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index aae5c1e..6c1bf29 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -7,6 +7,7 @@ pub mod generators; pub mod itemclone; pub mod needminbuffer; pub mod plaineventsjson; +pub mod print_on_done; pub mod rangefilter2; pub mod slidebuf; pub mod tcprawclient; diff --git a/crates/streams/src/print_on_done.rs b/crates/streams/src/print_on_done.rs new file mode 100644 index 0000000..54280d8 --- /dev/null +++ b/crates/streams/src/print_on_done.rs @@ -0,0 +1,41 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +pub struct PrintOnDone { + ts_ctor: Instant, + inp: INP, + on_done: Pin () + Send>>, +} + +impl PrintOnDone { + pub fn new(inp: INP, on_done: Pin () + Send>>) -> Self { + Self { + ts_ctor: Instant::now(), + inp, + on_done, + } + } +} + +impl Stream for PrintOnDone +where + INP: Stream + Unpin, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => { + (self.on_done)(self.ts_ctor); + Ready(None) + } + Pending => Pending, + } + } +}