release 0.5.3

This commit is contained in:
Dominik Werder
2024-10-31 12:14:46 +01:00
parent cde2e4c1a2
commit 4e1874381c
12 changed files with 32 additions and 122 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.3-aa.6"
version = "0.5.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -43,3 +43,6 @@ taskrun = { path = "../taskrun" }
scyllaconn = { path = "../scyllaconn" }
daqbuf-redis = { path = "../daqbuf-redis" }
httpclient = { path = "../httpclient" }
[features]
prometheus_endpoint = []

View File

@@ -7,14 +7,13 @@ pub mod channelconfig;
pub mod download;
pub mod err;
pub mod gather;
#[cfg(DISABLED)]
#[cfg(feature = "prometheus_endpoint")]
pub mod prometheus;
pub mod proxy;
pub mod pulsemap;
pub mod requests;
pub mod settings;
use self::bodystream::ToPublicResponse;
use crate::bodystream::response;
use crate::err::Error;
use ::err::thiserror;
@@ -39,7 +38,6 @@ use netpod::query::prebinned::PreBinnedQuery;
use netpod::req_uri_to_url;
use netpod::status_board;
use netpod::status_board_init;
use netpod::Database;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::ServiceVersion;
@@ -118,7 +116,7 @@ impl ServiceSharedResources {
pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
status_board_init();
#[cfg(DISABLED)]
#[cfg(feature = "prometheus_endpoint")]
if let Some(bind) = ncc.node.prometheus_api_bind {
tokio::spawn(prometheus::host(bind));
}

View File

@@ -1,5 +1,4 @@
use super::super::container_events::EventValueType;
use super::___;
use crate::binning::aggregator::AggregatorTimeWeight;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;

View File

@@ -1,5 +1,4 @@
use super::timeweight_events::BinnedEventsTimeweight;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::EventValueType;
use crate::channelevents::ChannelEvents;
@@ -10,22 +9,22 @@ use futures_util::StreamExt;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinningggBinnerDyn;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use items_0::timebin::EventsBoxed;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::TsNano;
use std::any;
use std::arch::x86_64;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
#[derive(Debug, ThisError)]
#[cstm(name = "BinnedEventsTimeweightDyn")]
pub enum Error {
@@ -37,7 +36,6 @@ pub struct BinnedEventsTimeweightDynbox<EVT>
where
EVT: EventValueType,
{
range: BinnedRange<TsNano>,
binner: BinnedEventsTimeweight<EVT>,
}
@@ -47,8 +45,7 @@ where
{
pub fn new(range: BinnedRange<TsNano>) -> Box<dyn BinnedEventsTimeweightTrait> {
let ret = Self {
binner: BinnedEventsTimeweight::new(range.clone()),
range,
binner: BinnedEventsTimeweight::new(range),
};
Box::new(ret)
}
@@ -165,7 +162,7 @@ impl BinnedEventsTimeweightStream {
fn handle_sitemty(
mut self: Pin<&mut Self>,
item: Sitemty<ChannelEvents>,
cx: &mut Context,
_cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
@@ -212,8 +209,8 @@ impl BinnedEventsTimeweightStream {
}
}
fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
debug!("handle_eos");
fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
trace_input_container!("handle_eos");
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
@@ -229,7 +226,7 @@ impl BinnedEventsTimeweightStream {
}
match self.binned_events.output().map_err(::err::Error::from_string)? {
Some(x) => {
debug!("seeing ready bins {:?}", x);
trace_emit!("seeing ready bins {:?}", x);
Ready(Some(Ok(DataItem(Data(x)))))
}
None => {

View File

@@ -41,7 +41,6 @@ pub async fn scylla_channel_event_stream(
let readopts = EventReadOpts::new(
evq.need_one_before_range(),
evq.need_value_data(),
evq.transform().enum_as_string().unwrap_or(false),
evq.settings().scylla_read_queue_len(),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {

View File

@@ -6,7 +6,6 @@ use chrono::TimeZone;
use chrono::Utc;
use err::Error;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::ttl::RetentionTime;
use netpod::AppendToUrl;

View File

@@ -28,14 +28,7 @@ use std::sync::Arc;
use std::time::Instant;
use tracing::Instrument;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaReadEvents")]

View File

@@ -27,39 +27,33 @@ use std::task::Context;
use std::task::Poll;
use std::time::Duration;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[derive(Debug, Clone)]
pub struct EventReadOpts {
with_values: bool,
enum_as_strings: bool,
one_before: bool,
qucap: u32,
}
impl EventReadOpts {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: Option<u32>) -> Self {
pub fn new(one_before: bool, with_values: bool, qucap: Option<u32>) -> Self {
Self {
one_before,
with_values,
enum_as_strings,
qucap: qucap.unwrap_or(2),
qucap: qucap.unwrap_or(1),
}
}
@@ -144,10 +138,6 @@ impl ReadQueue {
}
}
fn cap(&self) -> usize {
self.cap
}
fn len(&self) -> usize {
self.futs.len()
}
@@ -185,12 +175,14 @@ impl Stream for ReadQueue {
}
}
struct FetchEvents2 {
fut: Fst<FetchEventsFut>,
struct FetchEvents {
fut: FetchEventsFut,
}
struct FetchEvents {
qu: ReadQueue,
impl FetchEvents {
fn from_fut(fut: Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>) -> Self {
Self { fut }
}
}
enum ReadingState {
@@ -276,7 +268,7 @@ impl EventsStreamRt {
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
) -> Self {
debug!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}");
trace_init!("EventsStreamRt::new {ch_conf:?} {range:?} {rt:?} {readopts:?}");
let series = SeriesId::new(ch_conf.series());
let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone());
Self {
@@ -422,10 +414,8 @@ impl EventsStreamRt {
let jobtrace = ReadJobTrace::new();
let mfi = MakeFutInfo::new(self);
let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace);
let mut qu = ReadQueue::new(self.qucap);
qu.push(fut);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchEvents(FetchEvents { qu }),
reading_state: ReadingState::FetchEvents(FetchEvents::from_fut(fut)),
});
} else {
trace_fetch!("setup_bck_read no msp");
@@ -558,7 +548,6 @@ impl Stream for EventsStreamRt {
});
} else {
trace_fetch!("State::Begin Fwd");
// let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.setup_fwd_read();
}
continue;
@@ -566,7 +555,6 @@ impl Stream for EventsStreamRt {
State::ReadingBck(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(a)) => {
// trace_fetch!("ReadingBck FetchMsp {}", a.fmt());
if a.len() == 0 {
self.transition_to_bck_read();
continue;
@@ -592,8 +580,8 @@ impl Stream for EventsStreamRt {
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(x) => match x {
Ok((mut evs, jobtrace)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck {jobtrace}");
@@ -627,10 +615,6 @@ impl Stream for EventsStreamRt {
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::ReadQueueEmptyBck)))
}
Pending => Pending,
},
},
@@ -643,7 +627,7 @@ impl Stream for EventsStreamRt {
match a {
Ok(a) => {
if a.len() == 0 {
trace_fetch!("MSP INPUT DONE --------------------");
trace_msp_fetch!("msp input done");
st.msp_done = true;
}
for x in a {
@@ -664,8 +648,6 @@ impl Stream for EventsStreamRt {
trace_msp_fetch!("create msp read fut");
let fut = Self::make_msp_read_fut(&mut self2.msp_inp);
st.msp_fut = Some(FetchMsp { fut });
} else {
// trace_fetch!("nothing to do for msp fetch");
}
if st.qu.has_space() {
Self::redo_fwd_read(st, msp_buf);
@@ -706,50 +688,6 @@ impl Stream for EventsStreamRt {
}
}
}
// State::ReadingFwd(st) => match &mut st.reading_state {
// ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
// Ready(Ok(a)) => {
// // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
// for x in a {
// self.msp_buf.push_back(x);
// }
// if self.msp_buf.len() == 0 {
// self.state = State::InputDone;
// continue;
// } else {
// self.setup_fwd_read();
// continue;
// }
// }
// Ready(Err(e)) => Ready(Some(Err(e.into()))),
// Pending => Pending,
// },
// ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
// Ready(Some(x)) => match x {
// Ok((evs, mut jobtrace)) => {
// jobtrace
// .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
// use items_2::merger::Mergeable;
// trace_fetch!("ReadingFwd {jobtrace}");
// for ts in Mergeable::tss(&evs) {
// trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
// }
// self.out.push_back(evs);
// self.setup_fwd_read();
// continue;
// }
// Err(e) => {
// self.state = State::Done;
// Ready(Some(Err(e.into())))
// }
// },
// Ready(None) => {
// self.state = State::Done;
// Ready(Some(Err(Error::ReadQueueEmptyFwd)))
// }
// Pending => Pending,
// },
// },
State::InputDone => {
if self.out.len() == 0 {
self.state = State::Done;
@@ -758,7 +696,6 @@ impl Stream for EventsStreamRt {
items_2::empty::empty_events_dyn_ev(self.ch_conf.scalar_type(), self.ch_conf.shape());
match d {
Ok(empty) => {
// let empty = items_0::streamitem::sitem_data(ChannelEvents::Events(empty));
let item = items_2::channelevents::ChannelEvents::Events(empty);
Ready(Some(Ok(item)))
}

View File

@@ -1,7 +1,6 @@
use super::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
@@ -19,14 +18,7 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
}
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMsp")]

View File

@@ -18,7 +18,6 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]

View File

@@ -228,12 +228,6 @@ async fn timebinned_stream(
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>>, Error> {
use netpod::query::CacheUsage;
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore);
debug!("BINNING NEW METHOD");
debug!(
"timebinned_stream caching {:?} subgrids {:?}",
query,
query.subgrids()
);
let do_time_weight = true;
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
subgrids