From 4e1874381c963c085feac9c22693e31a626b0ad8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 31 Oct 2024 12:14:46 +0100 Subject: [PATCH] release 0.5.3 --- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/Cargo.toml | 3 + crates/httpret/src/httpret.rs | 6 +- .../binning/timeweight/timeweight_events.rs | 1 - .../timeweight/timeweight_events_dyn.rs | 21 ++--- crates/nodenet/src/scylla.rs | 1 - crates/query/src/api4.rs | 1 - crates/scyllaconn/src/events.rs | 9 +- crates/scyllaconn/src/events2/events.rs | 93 +++---------------- crates/scyllaconn/src/events2/msp.rs | 10 +- crates/streams/src/timebin/fromevents.rs | 1 - crates/streams/src/timebinnedjson.rs | 6 -- 12 files changed, 32 insertions(+), 122 deletions(-) diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 25ec841..0d60bd6 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.6" +version = "0.5.3" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index 8abb9d2..da0e6da 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -43,3 +43,6 @@ taskrun = { path = "../taskrun" } scyllaconn = { path = "../scyllaconn" } daqbuf-redis = { path = "../daqbuf-redis" } httpclient = { path = "../httpclient" } + +[features] +prometheus_endpoint = [] diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 5c292c9..15768e2 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -7,14 +7,13 @@ pub mod channelconfig; pub mod download; pub mod err; pub mod gather; -#[cfg(DISABLED)] +#[cfg(feature = "prometheus_endpoint")] pub mod prometheus; pub mod proxy; pub mod pulsemap; pub mod requests; pub mod settings; -use self::bodystream::ToPublicResponse; use crate::bodystream::response; use crate::err::Error; use ::err::thiserror; @@ -39,7 +38,6 @@ use netpod::query::prebinned::PreBinnedQuery; use netpod::req_uri_to_url; use netpod::status_board; use netpod::status_board_init; -use netpod::Database; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ServiceVersion; @@ -118,7 +116,7 @@ impl ServiceSharedResources { pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { status_board_init(); - #[cfg(DISABLED)] + #[cfg(feature = "prometheus_endpoint")] if let Some(bind) = ncc.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index ffcfdc8..6553fa9 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -1,5 +1,4 @@ use super::super::container_events::EventValueType; -use super::___; use crate::binning::aggregator::AggregatorTimeWeight; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; diff --git a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs index 4be8de5..a242639 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs @@ -1,5 +1,4 @@ use super::timeweight_events::BinnedEventsTimeweight; -use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; @@ -10,22 +9,22 @@ use futures_util::StreamExt; use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty; use items_0::timebin::BinnedEventsTimeweightTrait; -use items_0::timebin::BinningggBinnerDyn; use items_0::timebin::BinningggContainerBinsDyn; -use items_0::timebin::BinningggContainerEventsDyn; use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use items_0::timebin::EventsBoxed; use netpod::log::*; use netpod::BinnedRange; use netpod::TsNano; -use std::any; -use std::arch::x86_64; use std::ops::ControlFlow; use std::pin::Pin; use std::task::Context; use std::task::Poll; +macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + #[derive(Debug, ThisError)] #[cstm(name = "BinnedEventsTimeweightDyn")] pub enum Error { @@ -37,7 +36,6 @@ pub struct BinnedEventsTimeweightDynbox where EVT: EventValueType, { - range: BinnedRange, binner: BinnedEventsTimeweight, } @@ -47,8 +45,7 @@ where { pub fn new(range: BinnedRange) -> Box { let ret = Self { - binner: BinnedEventsTimeweight::new(range.clone()), - range, + binner: BinnedEventsTimeweight::new(range), }; Box::new(ret) } @@ -165,7 +162,7 @@ impl BinnedEventsTimeweightStream { fn handle_sitemty( mut self: Pin<&mut Self>, item: Sitemty, - cx: &mut Context, + _cx: &mut Context, ) -> ControlFlow::Item>>> { use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; @@ -212,8 +209,8 @@ impl BinnedEventsTimeweightStream { } } - fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { - debug!("handle_eos"); + fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll::Item>> { + trace_input_container!("handle_eos"); use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; use Poll::*; @@ -229,7 +226,7 @@ impl BinnedEventsTimeweightStream { } match self.binned_events.output().map_err(::err::Error::from_string)? { Some(x) => { - debug!("seeing ready bins {:?}", x); + trace_emit!("seeing ready bins {:?}", x); Ready(Some(Ok(DataItem(Data(x))))) } None => { diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 43bfd98..3d02961 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -41,7 +41,6 @@ pub async fn scylla_channel_event_stream( let readopts = EventReadOpts::new( evq.need_one_before_range(), evq.need_value_data(), - evq.transform().enum_as_string().unwrap_or(false), evq.settings().scylla_read_queue_len(), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index 91c34d7..f84f1e0 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -6,7 +6,6 @@ use chrono::TimeZone; use chrono::Utc; use err::Error; use netpod::get_url_query_pairs; -use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::ttl::RetentionTime; use netpod::AppendToUrl; diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 07c3b7d..99c7080 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -28,14 +28,7 @@ use std::sync::Arc; use std::time::Instant; use tracing::Instrument; -#[allow(unused)] -macro_rules! trace_fetch { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] #[cstm(name = "ScyllaReadEvents")] diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 24691df..c9d69f1 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -27,39 +27,33 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; -#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } #[derive(Debug, Clone)] pub struct EventReadOpts { with_values: bool, - enum_as_strings: bool, one_before: bool, qucap: u32, } impl EventReadOpts { - pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: Option) -> Self { + pub fn new(one_before: bool, with_values: bool, qucap: Option) -> Self { Self { one_before, with_values, - enum_as_strings, - qucap: qucap.unwrap_or(2), + qucap: qucap.unwrap_or(1), } } @@ -144,10 +138,6 @@ impl ReadQueue { } } - fn cap(&self) -> usize { - self.cap - } - fn len(&self) -> usize { self.futs.len() } @@ -185,12 +175,14 @@ impl Stream for ReadQueue { } } -struct FetchEvents2 { - fut: Fst, +struct FetchEvents { + fut: FetchEventsFut, } -struct FetchEvents { - qu: ReadQueue, +impl FetchEvents { + fn from_fut(fut: Pin, ReadJobTrace), Error>> + Send>>) -> Self { + Self { fut } + } } enum ReadingState { @@ -276,7 +268,7 @@ impl EventsStreamRt { readopts: EventReadOpts, scyqueue: ScyllaQueue, ) -> Self { - debug!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}"); + trace_init!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}"); let series = SeriesId::new(ch_conf.series()); let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone()); Self { @@ -422,10 +414,8 @@ impl EventsStreamRt { let jobtrace = ReadJobTrace::new(); let mfi = MakeFutInfo::new(self); let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace); - let mut qu = ReadQueue::new(self.qucap); - qu.push(fut); self.state = State::ReadingBck(ReadingBck { - reading_state: ReadingState::FetchEvents(FetchEvents { qu }), + reading_state: ReadingState::FetchEvents(FetchEvents::from_fut(fut)), }); } else { trace_fetch!("setup_bck_read no msp"); @@ -558,7 +548,6 @@ impl Stream for EventsStreamRt { }); } else { trace_fetch!("State::Begin Fwd"); - // let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.setup_fwd_read(); } continue; @@ -566,7 +555,6 @@ impl Stream for EventsStreamRt { State::ReadingBck(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { Ready(Ok(a)) => { - // trace_fetch!("ReadingBck FetchMsp {}", a.fmt()); if a.len() == 0 { self.transition_to_bck_read(); continue; @@ -592,8 +580,8 @@ impl Stream for EventsStreamRt { Ready(Err(e)) => Ready(Some(Err(e.into()))), Pending => Pending, }, - ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { - Ready(Some(x)) => match x { + ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { + Ready(x) => match x { Ok((mut evs, jobtrace)) => { use items_2::merger::Mergeable; trace_fetch!("ReadingBck {jobtrace}"); @@ -627,10 +615,6 @@ impl Stream for EventsStreamRt { Ready(Some(Err(e))) } }, - Ready(None) => { - self.state = State::Done; - Ready(Some(Err(Error::ReadQueueEmptyBck))) - } Pending => Pending, }, }, @@ -643,7 +627,7 @@ impl Stream for EventsStreamRt { match a { Ok(a) => { if a.len() == 0 { - trace_fetch!("MSP INPUT DONE --------------------"); + trace_msp_fetch!("msp input done"); st.msp_done = true; } for x in a { @@ -664,8 +648,6 @@ impl Stream for EventsStreamRt { trace_msp_fetch!("create msp read fut"); let fut = Self::make_msp_read_fut(&mut self2.msp_inp); st.msp_fut = Some(FetchMsp { fut }); - } else { - // trace_fetch!("nothing to do for msp fetch"); } if st.qu.has_space() { Self::redo_fwd_read(st, msp_buf); @@ -706,50 +688,6 @@ impl Stream for EventsStreamRt { } } } - // State::ReadingFwd(st) => match &mut st.reading_state { - // ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { - // Ready(Ok(a)) => { - // // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); - // for x in a { - // self.msp_buf.push_back(x); - // } - // if self.msp_buf.len() == 0 { - // self.state = State::InputDone; - // continue; - // } else { - // self.setup_fwd_read(); - // continue; - // } - // } - // Ready(Err(e)) => Ready(Some(Err(e.into()))), - // Pending => Pending, - // }, - // ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { - // Ready(Some(x)) => match x { - // Ok((evs, mut jobtrace)) => { - // jobtrace - // .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); - // use items_2::merger::Mergeable; - // trace_fetch!("ReadingFwd {jobtrace}"); - // for ts in Mergeable::tss(&evs) { - // trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); - // } - // self.out.push_back(evs); - // self.setup_fwd_read(); - // continue; - // } - // Err(e) => { - // self.state = State::Done; - // Ready(Some(Err(e.into()))) - // } - // }, - // Ready(None) => { - // self.state = State::Done; - // Ready(Some(Err(Error::ReadQueueEmptyFwd))) - // } - // Pending => Pending, - // }, - // }, State::InputDone => { if self.out.len() == 0 { self.state = State::Done; @@ -758,7 +696,6 @@ impl Stream for EventsStreamRt { items_2::empty::empty_events_dyn_ev(self.ch_conf.scalar_type(), self.ch_conf.shape()); match d { Ok(empty) => { - // let empty = items_0::streamitem::sitem_data(ChannelEvents::Events(empty)); let item = items_2::channelevents::ChannelEvents::Events(empty); Ready(Some(Ok(item))) } diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 73b8dcd..eba1004 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -1,7 +1,6 @@ use super::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; -use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Future; @@ -19,14 +18,7 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -#[allow(unused)] -macro_rules! trace_emit { - ($det:expr, $($arg:tt)*) => { - if $det { - trace!($($arg)*); - } - }; -} +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] #[cstm(name = "EventsMsp")] diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 316e7cf..384ddaf 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -#[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 72f008e..afa44f9 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -228,12 +228,6 @@ async fn timebinned_stream( ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore); - debug!("BINNING NEW METHOD"); - debug!( - "timebinned_stream caching {:?} subgrids {:?}", - query, - query.subgrids() - ); let do_time_weight = true; let bin_len_layers = if let Some(subgrids) = query.subgrids() { subgrids