From 3a77d116f67e82e582915e80a4ad2dced136ae82 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 15 Jun 2024 09:49:07 +0200 Subject: [PATCH] Prepare for merge of retention times --- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/src/channelconfig.rs | 6 +- crates/netpod/src/range/evrange.rs | 9 + crates/nodenet/src/scylla.rs | 29 +- crates/query/src/api4/events.rs | 24 ++ crates/scyllaconn/src/events.rs | 42 +- crates/scyllaconn/src/events2.rs | 4 + crates/scyllaconn/src/events2/events.rs | 276 ++++++++++++ crates/scyllaconn/src/events2/firstbefore.rs | 22 + crates/scyllaconn/src/events2/mergert.rs | 423 +++++++++++++++++++ crates/scyllaconn/src/events2/msp.rs | 52 +-- crates/scyllaconn/src/events2/nonempty.rs | 42 ++ crates/taskrun/src/taskrun.rs | 12 +- 13 files changed, 896 insertions(+), 47 deletions(-) create mode 100644 crates/scyllaconn/src/events2/events.rs create mode 100644 crates/scyllaconn/src/events2/firstbefore.rs create mode 100644 crates/scyllaconn/src/events2/mergert.rs create mode 100644 crates/scyllaconn/src/events2/nonempty.rs diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 1f9fd8e..f347371 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.1-aa.1" +version = "0.5.1-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 5e0a973..ee95bef 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -639,7 +639,7 @@ impl ScyllaSeriesTsMsp { let mut st_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); use chrono::TimeZone; while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); @@ -650,7 +650,7 @@ impl ScyllaSeriesTsMsp { let mut mt_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); @@ -660,7 +660,7 @@ impl ScyllaSeriesTsMsp { let mut lt_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index a6e6b8c..cfab2c3 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -77,6 +77,15 @@ impl NanoRange { } } +impl From<(u64, u64)> for NanoRange { + fn from(value: (u64, u64)) -> Self { + Self { + beg: value.0, + end: value.1, + } + } +} + impl TryFrom<&SeriesRange> for NanoRange { type Error = Error; diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 1044c86..74f0824 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -1,6 +1,7 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -10,6 +11,7 @@ use netpod::ttl::RetentionTime; use netpod::ChConf; use query::api4::events::EventsSubQuery; use scyllaconn::worker::ScyllaQueue; +use scyllaconn::SeriesId; use std::pin::Pin; use taskrun::tokio; @@ -28,7 +30,30 @@ pub async fn scylla_channel_event_stream( let do_test_stream_error = false; let with_values = evq.need_value_data(); debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); - let stream = scyllaconn::events::EventsStreamScylla::new( + let stream: Pin + Send>> = if evq.use_all_rt() { + let x = scyllaconn::events2::mergert::MergeRts::new( + SeriesId::new(chconf.series()), + scalar_type.clone(), + shape.clone(), + evq.range().into(), + with_values, + scyqueue.clone(), + ); + Box::pin(x) + } else { + let x = scyllaconn::events2::events::EventsStreamRt::new( + RetentionTime::Short, + SeriesId::new(chconf.series()), + scalar_type.clone(), + shape.clone(), + evq.range().into(), + with_values, + scyqueue.clone(), + ) + .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); + Box::pin(x) + }; + /*let stream = scyllaconn::events::EventsStreamScylla::new( RetentionTime::Short, series, evq.range().into(), @@ -38,7 +63,7 @@ pub async fn scylla_channel_event_stream( with_values, scyqueue.clone(), do_test_stream_error, - ); + );*/ let stream = stream .map(move |item| match &item { Ok(k) => match k { diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index c9e5eab..25a8900 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -56,6 +56,8 @@ pub struct PlainEventsQuery { create_errors: Vec, #[serde(default)] log_level: String, + #[serde(default)] + use_all_rt: bool, } impl PlainEventsQuery { @@ -81,6 +83,7 @@ impl PlainEventsQuery { merger_out_len_max: None, create_errors: Vec::new(), log_level: String::new(), + use_all_rt: false, } } @@ -206,6 +209,10 @@ impl PlainEventsQuery { pub fn log_level(&self) -> &str { &self.log_level } + + pub fn use_all_rt(&self) -> bool { + self.use_all_rt + } } impl HasBackend for PlainEventsQuery { @@ -283,6 +290,11 @@ impl FromUrl for PlainEventsQuery { .map(|x| x.split(",").map(|x| x.to_string()).collect()) .unwrap_or(Vec::new()), log_level: pairs.get("log_level").map_or(String::new(), String::from), + use_all_rt: pairs + .get("useAllRt") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg_no_trace(format!("can not parse useAllRt: {}", e)))?, }; Ok(ret) } @@ -342,6 +354,9 @@ impl AppendToUrl for PlainEventsQuery { if self.log_level.len() != 0 { g.append_pair("log_level", &self.log_level); } + if self.use_all_rt { + g.append_pair("useAllRt", "true"); + } } } @@ -385,6 +400,7 @@ pub struct EventsSubQuerySettings { buf_len_disk_io: Option, queue_len_disk_io: Option, create_errors: Vec, + use_all_rt: bool, } impl Default for EventsSubQuerySettings { @@ -398,6 +414,7 @@ impl Default for EventsSubQuerySettings { buf_len_disk_io: None, queue_len_disk_io: None, create_errors: Vec::new(), + use_all_rt: true, } } } @@ -414,6 +431,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: value.create_errors.clone(), + use_all_rt: value.use_all_rt(), } } } @@ -431,6 +449,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: Vec::new(), + use_all_rt: true, } } } @@ -448,6 +467,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { buf_len_disk_io: Some(disk_io_tune.read_buffer_len), queue_len_disk_io: Some(disk_io_tune.read_queue_len), create_errors: Vec::new(), + use_all_rt: false, } } } @@ -551,6 +571,10 @@ impl EventsSubQuery { pub fn log_level(&self) -> &str { &self.log_level } + + pub fn use_all_rt(&self) -> bool { + self.settings.use_all_rt + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 89dc3fb..475eac2 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -26,6 +26,7 @@ use scylla::frame::response::result::Row; use scylla::prepared_statement::PreparedStatement; use scylla::Session; use scylla::Session as ScySession; +use series::SeriesId; use std::collections::VecDeque; use std::mem; use std::pin::Pin; @@ -71,6 +72,17 @@ impl StmtsLspShape { fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { let ret = match stname { "u8" => &self.u8, + "u16" => &self.u16, + "u32" => &self.u32, + "u64" => &self.u64, + "i8" => &self.i8, + "i16" => &self.i16, + "i32" => &self.i32, + "i64" => &self.i64, + "f32" => &self.f32, + "f64" => &self.f64, + "bool" => &self.bool, + "string" => &self.string, _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), }; Ok(ret) @@ -449,7 +461,7 @@ impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); -struct ReadNextValuesOpts { +pub(super) struct ReadNextValuesOpts { rt: RetentionTime, series: u64, ts_msp: TsMs, @@ -459,7 +471,29 @@ struct ReadNextValuesOpts { scyqueue: ScyllaQueue, } -async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> +impl ReadNextValuesOpts { + pub(super) fn new( + rt: RetentionTime, + series: SeriesId, + ts_msp: TsMs, + range: ScyllaSeriesRange, + fwd: bool, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + rt, + series: series.id(), + ts_msp, + range, + fwd, + with_values, + scyqueue, + } + } +} + +pub(super) async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> where ST: ValTy, { @@ -648,7 +682,7 @@ fn convert_rows( Ok(ret) } -struct ReadValues { +pub(super) struct ReadValues { rt: RetentionTime, series: u64, scalar_type: ScalarType, @@ -663,7 +697,7 @@ struct ReadValues { } impl ReadValues { - fn new( + pub(super) fn new( rt: RetentionTime, series: u64, scalar_type: ScalarType, diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs index a0554af..be623b6 100644 --- a/crates/scyllaconn/src/events2.rs +++ b/crates/scyllaconn/src/events2.rs @@ -1 +1,5 @@ +pub mod events; +pub mod firstbefore; +pub mod mergert; pub mod msp; +pub mod nonempty; diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs new file mode 100644 index 0000000..e2fefde --- /dev/null +++ b/crates/scyllaconn/src/events2/events.rs @@ -0,0 +1,276 @@ +use super::msp::MspStreamRt; +use crate::events::read_next_values; +use crate::events::ReadNextValuesOpts; +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Events; +use items_2::channelevents::ChannelEvents; +use netpod::log::*; +use netpod::ttl::RetentionTime; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsMs; +use series::SeriesId; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +pub enum Error { + Worker(#[from] crate::worker::Error), + Events(#[from] crate::events::Error), + Msp(#[from] crate::events2::msp::Error), + Logic, +} + +struct FetchMsp { + fut: Pin>> + Send>>, +} + +struct FetchEvents { + fut: Pin, crate::events2::events::Error>> + Send>>, +} + +enum ReadingState { + FetchMsp(FetchMsp), + FetchEvents(FetchEvents), +} + +struct Reading { + scyqueue: ScyllaQueue, + reading_state: ReadingState, +} + +enum State { + Begin, + Reading(Reading), + InputDone, +} + +pub struct EventsStreamRt { + rt: RetentionTime, + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + state: State, + scyqueue: ScyllaQueue, + msp_inp: MspStreamRt, + out: VecDeque>, + ts_seen_max: u64, +} + +impl EventsStreamRt { + pub fn new( + rt: RetentionTime, + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}"); + let msp_inp = + crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone()); + Self { + rt, + series, + scalar_type, + shape, + range, + with_values, + state: State::Begin, + scyqueue, + msp_inp, + out: VecDeque::new(), + ts_seen_max: 0, + } + } + + fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> { + let _ = st; + let _ = cx; + todo!() + } + + fn make_read_events_fut( + &mut self, + ts_msp: TsMs, + scyqueue: ScyllaQueue, + ) -> Pin, Error>> + Send>> { + let fwd = true; + let opts = ReadNextValuesOpts::new( + self.rt.clone(), + self.series.clone(), + ts_msp, + self.range.clone(), + fwd, + self.with_values, + scyqueue, + ); + let scalar_type = self.scalar_type.clone(); + let shape = self.shape.clone(); + let fut = async move { + let ret = match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => read_next_values::(opts).await, + ScalarType::U16 => read_next_values::(opts).await, + ScalarType::U32 => read_next_values::(opts).await, + ScalarType::U64 => read_next_values::(opts).await, + ScalarType::I8 => read_next_values::(opts).await, + ScalarType::I16 => read_next_values::(opts).await, + ScalarType::I32 => read_next_values::(opts).await, + ScalarType::I64 => read_next_values::(opts).await, + ScalarType::F32 => read_next_values::(opts).await, + ScalarType::F64 => read_next_values::(opts).await, + ScalarType::BOOL => read_next_values::(opts).await, + ScalarType::STRING => read_next_values::(opts).await, + ScalarType::Enum => read_next_values::(opts).await, + ScalarType::ChannelStatus => { + warn!("read scalar channel status not yet supported"); + err::todoval() + } + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => read_next_values::>(opts).await, + ScalarType::U16 => read_next_values::>(opts).await, + ScalarType::U32 => read_next_values::>(opts).await, + ScalarType::U64 => read_next_values::>(opts).await, + ScalarType::I8 => read_next_values::>(opts).await, + ScalarType::I16 => read_next_values::>(opts).await, + ScalarType::I32 => read_next_values::>(opts).await, + ScalarType::I64 => read_next_values::>(opts).await, + ScalarType::F32 => read_next_values::>(opts).await, + ScalarType::F64 => read_next_values::>(opts).await, + ScalarType::BOOL => read_next_values::>(opts).await, + ScalarType::STRING => { + warn!("read array string not yet supported"); + err::todoval() + } + ScalarType::Enum => read_next_values::>(opts).await, + ScalarType::ChannelStatus => { + warn!("read array channel status not yet supported"); + err::todoval() + } + }, + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }; + ret.map_err(Error::from) + }; + Box::pin(fut) + } +} + +impl Stream for EventsStreamRt { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + if let Some(item) = self.out.pop_front() { + item.verify(); + if let Some(item_min) = item.ts_min() { + if item_min < self.ts_seen_max { + debug!("ordering error A {} {}", item_min, self.ts_seen_max); + } + } + if let Some(item_max) = item.ts_max() { + if item_max < self.ts_seen_max { + debug!("ordering error B {} {}", item_max, self.ts_seen_max); + } else { + self.ts_seen_max = item_max; + } + } + debug!("deliver item {}", item.output_info()); + break Ready(Some(Ok(ChannelEvents::Events(item)))); + } + break match &mut self.state { + State::Begin => { + let msp_inp = unsafe { + let ptr = (&mut self.msp_inp) as *mut MspStreamRt; + &mut *ptr + }; + let fut = Box::pin(msp_inp.next()); + self.state = State::Reading(Reading { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + continue; + } + State::Reading(st) => match &mut st.reading_state { + ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(ts))) => { + let scyqueue = st.scyqueue.clone(); + let fut = self.make_read_events_fut(ts, scyqueue); + if let State::Reading(st) = &mut self.state { + st.reading_state = ReadingState::FetchEvents(FetchEvents { fut }); + continue; + } else { + Ready(Some(Err(Error::Logic))) + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(None) => { + self.state = State::InputDone; + continue; + } + Pending => Pending, + }, + ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.out.push_back(x); + let msp_inp = unsafe { + let ptr = (&mut self.msp_inp) as *mut MspStreamRt; + &mut *ptr + }; + let fut = Box::pin(msp_inp.next()); + if let State::Reading(st) = &mut self.state { + st.reading_state = ReadingState::FetchMsp(FetchMsp { fut }); + continue; + } else { + Ready(Some(Err(Error::Logic))) + } + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + }, + }, + State::InputDone => { + if self.out.len() == 0 { + Ready(None) + } else { + continue; + } + } + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: EventsStreamRt = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs new file mode 100644 index 0000000..9c0aafa --- /dev/null +++ b/crates/scyllaconn/src/events2/firstbefore.rs @@ -0,0 +1,22 @@ +use futures_util::Stream; +use items_0::Events; +use items_0::WithLen; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct FirstBefore { + inp: S, +} + +impl Stream for FirstBefore +where + S: Stream> + Unpin, + T: Events, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs new file mode 100644 index 0000000..a2c9498 --- /dev/null +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -0,0 +1,423 @@ +use super::events::EventsStreamRt; +use super::nonempty::NonEmpty; +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_2::channelevents::ChannelEvents; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; +use netpod::ScalarType; +use netpod::Shape; +use series::SeriesId; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +pub enum Error { + Events(#[from] crate::events2::events::Error), + Logic, +} + +enum Resolvable +where + F: Future, +{ + Future(F), + Output(::Output), + Taken, +} + +impl Resolvable +where + F: Future, +{ + fn unresolved(&self) -> bool { + match self { + Resolvable::Future(_) => true, + Resolvable::Output(_) => false, + Resolvable::Taken => false, + } + } + + fn take(&mut self) -> Option<::Output> { + let x = std::mem::replace(self, Resolvable::Taken); + match x { + Resolvable::Future(_) => None, + Resolvable::Output(x) => Some(x), + Resolvable::Taken => None, + } + } +} + +impl Future for Resolvable +where + F: Future + Unpin, +{ + type Output = ::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<::Output> { + match unsafe { self.get_unchecked_mut() } { + Resolvable::Future(fut) => fut.poll_unpin(cx), + Resolvable::Output(_) => panic!(), + Resolvable::Taken => panic!(), + } + } +} + +struct ReadEvents { + fut: Pin>> + Send>>, +} + +enum State { + Begin, + FetchFirstSt(ReadEvents), + FetchFirstMt(ReadEvents), + FetchFirstLt(ReadEvents), + ReadingLt( + Option, + VecDeque, + Option>>, + ), + ReadingMt( + Option, + VecDeque, + Option>>, + ), + ReadingSt( + Option, + VecDeque, + Option>>, + ), + Error, +} + +pub struct MergeRts { + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + inp_st: Option>>, + inp_mt: Option>>, + inp_lt: Option>>, + state: State, + buf_st: VecDeque, + buf_mt: VecDeque, + buf_lt: VecDeque, + out: VecDeque, +} + +impl MergeRts { + pub fn new( + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + series, + scalar_type, + shape, + range, + with_values, + scyqueue, + inp_st: None, + inp_mt: None, + inp_lt: None, + state: State::Begin, + buf_st: VecDeque::new(), + buf_mt: VecDeque::new(), + buf_lt: VecDeque::new(), + out: VecDeque::new(), + } + } + + fn setup_first_st(&mut self) { + let inp = EventsStreamRt::new( + RetentionTime::Short, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.with_values, + self.scyqueue.clone(), + ); + let inp = NonEmpty::new(inp); + self.inp_st = Some(Box::new(inp)); + } + + fn setup_first_mt(&mut self) { + let inp = EventsStreamRt::new( + RetentionTime::Medium, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + Self::constrained_range(&self.range, &self.buf_st), + self.with_values, + self.scyqueue.clone(), + ); + let inp = NonEmpty::new(inp); + self.inp_mt = Some(Box::new(inp)); + } + + fn setup_first_lt(&mut self) { + let inp = EventsStreamRt::new( + RetentionTime::Long, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + Self::constrained_range(&self.range, &self.buf_mt), + self.with_values, + self.scyqueue.clone(), + ); + let inp = NonEmpty::new(inp); + self.inp_lt = Some(Box::new(inp)); + } + + fn setup_read_st(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_mt(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_lt(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_any(inp: &mut Option>>) -> ReadEvents { + let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { + if let Some(e) = buf.front() { + if let Some(ts) = e.ts_min() { + let nrange = NanoRange::from((ts, 0)); + ScyllaSeriesRange::from(&SeriesRange::from(nrange)) + } else { + debug!("no ts even though should not have empty buffers"); + full.clone() + } + } else { + full.clone() + } + } + + fn dummy(&mut self) -> bool { + if self.inp_lt.is_some() { + // *fut = Some(self.setup_read_lt()); + true + } else { + false + } + } +} + +impl Stream for MergeRts { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut out2 = VecDeque::new(); + loop { + while let Some(x) = out2.pop_front() { + self.out.push_back(x); + } + if let Some(item) = self.out.pop_front() { + break Ready(Some(Ok(item))); + } + break match &mut self.state { + State::Begin => { + self.setup_first_st(); + self.state = State::FetchFirstSt(self.setup_read_st()); + continue; + } + State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + debug!("have first from ST"); + self.buf_st.push_back(x); + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + continue; + } + Ready(Some(Err(e))) => { + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from ST"); + self.inp_st = None; + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + debug!("have first from MT"); + self.buf_mt.push_back(x); + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + continue; + } + Ready(Some(Err(e))) => { + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from MT"); + self.inp_mt = None; + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + debug!("have first from LT"); + self.buf_lt.push_back(x); + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + continue; + } + Ready(Some(Err(e))) => { + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from LT"); + self.inp_lt = None; + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + continue; + } + Pending => Pending, + }, + State::ReadingLt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + buf.push_back(x); + continue; + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + self.inp_lt = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); + // *fut = Some(self.setup_read_lt()); + continue; + } else { + let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); + self.state = State::ReadingMt(None, buf, self.inp_mt.take()); + continue; + } + } + State::ReadingMt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + buf.push_back(x); + continue; + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + self.inp_mt = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); + self.state = State::ReadingSt(None, buf, self.inp_st.take()); + continue; + } + } + State::ReadingSt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + buf.push_back(x); + continue; + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Error; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + self.inp_st = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + debug!("fully done"); + Ready(None) + } + } + State::Error => Ready(None), + }; + } + } +} diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index a9213b3..0da7866 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -54,7 +54,7 @@ enum State { } #[pin_project::pin_project] -pub struct MspStream { +pub struct MspStreamRt { rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, @@ -63,7 +63,7 @@ pub struct MspStream { out: VecDeque, } -impl MspStream { +impl MspStreamRt { pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self { let fut_bck = { let scyqueue = scyqueue.clone(); @@ -93,7 +93,7 @@ impl MspStream { } } -impl Stream for MspStream { +impl Stream for MspStreamRt { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -112,8 +112,7 @@ impl Stream for MspStream { have_pending = true; } }, - Resolvable::Output(_) => {} - Resolvable::Taken => {} + _ => {} } let rsv = &mut st.fut_fwd; match rsv { @@ -125,37 +124,28 @@ impl Stream for MspStream { have_pending = true; } }, - Resolvable::Output(_) => {} - Resolvable::Taken => {} + _ => {} } if have_pending { Pending } else { let taken_bck = st.fut_bck.take(); let taken_fwd = st.fut_fwd.take(); - if let Some(x) = taken_bck { - match x { - Ok(v) => { - for e in v { - self.out.push_back(e) - } - - if let Some(x) = taken_fwd { - match x { - Ok(v) => { - for e in v { - self.out.push_back(e) - } - - self.state = State::InputDone; - continue; - } - Err(e) => Ready(Some(Err(e.into()))), + self.state = State::InputDone; + if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) { + match taken_bck { + Ok(v1) => match taken_fwd { + Ok(v2) => { + for e in v1 { + self.out.push_back(e) } - } else { - Ready(Some(Err(Error::Logic))) + for e in v2 { + self.out.push_back(e) + } + continue; } - } + Err(e) => Ready(Some(Err(e.into()))), + }, Err(e) => Ready(Some(Err(e.into()))), } } else { @@ -183,10 +173,10 @@ where #[allow(unused)] fn trait_assert_try() { - let x: MspStream = todoval(); + let x: MspStreamRt = phantomval(); trait_assert(x); } -fn todoval() -> T { - todo!() +fn phantomval() -> T { + panic!() } diff --git a/crates/scyllaconn/src/events2/nonempty.rs b/crates/scyllaconn/src/events2/nonempty.rs new file mode 100644 index 0000000..565576a --- /dev/null +++ b/crates/scyllaconn/src/events2/nonempty.rs @@ -0,0 +1,42 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::WithLen; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct NonEmpty { + inp: S, +} + +impl NonEmpty { + pub fn new(inp: S) -> Self { + Self { inp } + } +} + +impl Stream for NonEmpty +where + S: Stream> + Unpin, + T: WithLen, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(x))) => { + if x.len() != 0 { + Ready(Some(Ok(x))) + } else { + continue; + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + }; + } + } +} diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 4a20afc..9698d17 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -159,7 +159,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { console_subscriber::init(); } else { // Logging setup - let filter = tracing_subscriber::EnvFilter::builder() + let filter_1 = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; @@ -168,9 +168,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; - let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { + /*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { if true { - return true; + return false; } if *meta.level() <= tracing::Level::TRACE { if ["httpret", "scyllaconn"].contains(&meta.target()) { @@ -207,7 +207,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { } else { true } - }); + });*/ let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_writer(io::stderr) .with_timer(timer) @@ -215,9 +215,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_ansi(false) .with_thread_names(true) .event_format(formatter::FormatTxt) - .with_filter(filter_3) + // .with_filter(filter_3) .with_filter(filter_2) - .with_filter(filter) + .with_filter(filter_1) // .and_then(LogFilterLayer::new("lay1".into())) // .and_then(LogFilterLayer::new("lay2".into())) ;