From 927ef9ca5570fe2c0ce5c66c8f3e0ce1d3ff6c36 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 26 Nov 2024 16:29:16 +0100 Subject: [PATCH] WIP --- crates/daqbuffer/Cargo.toml | 2 +- crates/daqbuffer/src/fetch.rs | 4 +- crates/httpret/src/api4/binned.rs | 2 +- crates/httpret/src/api4/events.rs | 71 +------- crates/nodenet/src/conn.rs | 21 +-- crates/scyllaconn/src/events2/events.rs | 6 +- .../scyllaconn/src/events2/mergertchained.rs | 5 +- .../src/events2/onebeforeandbulk.rs | 161 +++++++++--------- 8 files changed, 97 insertions(+), 175 deletions(-) diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index f099f3b..4f3f100 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.5-aa.0" +version = "0.5.5-aa.1" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/daqbuffer/src/fetch.rs b/crates/daqbuffer/src/fetch.rs index 439549f..aae7f08 100644 --- a/crates/daqbuffer/src/fetch.rs +++ b/crates/daqbuffer/src/fetch.rs @@ -14,7 +14,7 @@ use netpod::log::*; use netpod::ScalarType; use netpod::Shape; use netpod::APP_CBOR_FRAMED; -use streams::cbor_stream::FramedBytesToSitemtyDynEventsStream; +use streams::cbor_stream::FramedBytesToChannelEventsStream; use url::Url; #[derive(Debug, ThisError)] @@ -49,7 +49,7 @@ pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Res } debug!("fetch_cbor head {head:?}"); let stream = IncomingStream::new(body); - let stream = FramedBytesToSitemtyDynEventsStream::new(stream, scalar_type, shape); + let stream = FramedBytesToChannelEventsStream::new(stream, scalar_type, shape); let stream = stream .map(|item| { info!("{item:?}"); diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index a4831fb..1bafa6d 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,4 +1,3 @@ -use crate::api4::events::bytes_chunks_to_len_framed_str; use crate::bodystream::response; use crate::channelconfig::ch_conf_from_binned; use crate::requests::accepts_json_framed; @@ -40,6 +39,7 @@ use std::sync::Arc; use streams::collect::CollectResult; use streams::eventsplainreader::DummyCacheReadProvider; use streams::eventsplainreader::SfDatabufferEventReadProvider; +use streams::lenframe::bytes_chunks_to_len_framed_str; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index be2616a..f731a8b 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -4,16 +4,10 @@ use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::response; use crate::ServiceSharedResources; -use bytes::Bytes; -use bytes::BytesMut; use daqbuf_err as err; use dbconn::worker::PgQueue; use err::thiserror; use err::ThisError; -use futures_util::future; -use futures_util::stream; -use futures_util::Stream; -use futures_util::StreamExt; use http::header::CONTENT_TYPE; use http::Method; use http::StatusCode; @@ -41,6 +35,10 @@ use query::api4::events::PlainEventsQuery; use std::sync::Arc; use streams::collect::CollectResult; use streams::instrument::InstrumentStream; +use streams::lenframe::bytes_chunks_to_framed; +use streams::lenframe::bytes_chunks_to_len_framed_str; +use streams::plaineventscbor::plain_events_cbor_stream; +use streams::plaineventsjson::plain_events_json_stream; use tracing::Instrument; #[derive(Debug, ThisError)] @@ -179,8 +177,7 @@ async fn plain_events_cbor_framed( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); - let stream = - streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; + let stream = plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; let stream = bytes_chunks_to_framed(stream); let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); @@ -210,8 +207,7 @@ async fn plain_events_json_framed( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); - let stream = - streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; + let stream = plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) @@ -271,58 +267,3 @@ async fn plain_events_json( } } } - -fn bytes_chunks_to_framed(stream: S) -> impl Stream> -where - S: Stream>, - T: Into, - E: std::error::Error, -{ - use future::ready; - stream - // TODO unify this map to padded bytes for both json and cbor output - .flat_map(|x| match x { - Ok(y) => { - use bytes::BufMut; - let buf = y.into(); - let adv = (buf.len() + 7) / 8 * 8; - let pad = adv - buf.len(); - let mut b2 = BytesMut::with_capacity(16); - b2.put_u32_le(buf.len() as u32); - b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); - let mut b3 = BytesMut::with_capacity(16); - b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); - stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) - } - Err(e) => { - error!("{e}"); - stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())]) - } - }) - .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) -} - -// TODO move this, it's also used by binned. -pub fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> -where - S: Stream>, - T: Into, - E: std::error::Error, -{ - use future::ready; - stream - .flat_map(|x| match x { - Ok(y) => { - use std::fmt::Write; - let s = y.into(); - let mut b2 = String::with_capacity(16); - write!(b2, "{:15}\n", s.len()).unwrap(); - stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))]) - } - Err(e) => { - error!("{e}"); - stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())]) - } - }) - .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) -} diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 7ba1141..4b37b25 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -6,7 +6,6 @@ use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; -use items_0::on_sitemty_data; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; @@ -29,6 +28,7 @@ use scyllaconn::worker::ScyllaQueue; use std::net::SocketAddr; use std::pin::Pin; use streamio::tcpreadasbytes::TcpReadAsBytes; +use streams::frames::frameable_stream_to_bytes_stream; use streams::frames::inmem::BoxedBytesStream; use streams::frames::inmem::InMemoryFrameStream; use streams::tcprawclient::TEST_BACKEND; @@ -56,6 +56,7 @@ pub enum Error { Framable(#[from] items_2::framable::Error), Frame(#[from] items_2::frame::Error), InMem(#[from] streams::frames::inmem::Error), + FramedStream(#[from] streams::frames::Error), } pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> { @@ -139,23 +140,7 @@ pub async fn create_response_bytes_stream( Ok(ret) } else { let stream = make_channel_events_stream_data(evq, reqctx, scyqueue, ncc).await?; - let stream = stream.map(move |x| { - on_sitemty_data!(x, |x: ChannelEvents| { - match x { - ChannelEvents::Events(evs) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Events(evs), - ))), - ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(x), - ))), - } - }) - }); - let stream = stream.map(|x| { - x.make_frame_dyn() - .map(bytes::BytesMut::freeze) - .map_err(sitem_err2_from_string) - }); + let stream = frameable_stream_to_bytes_stream(stream).map_err(sitem_err2_from_string); let ret = Box::pin(stream); Ok(ret) } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 9520db3..5e1da21 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -466,14 +466,14 @@ impl Stream for EventsStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let mut i = 0; + let mut i = 0usize; loop { i += 1; - if i > 5000 { + if i > 500000000000 { panic!("too many iterations") } if let Some(mut item) = self.out.pop_front() { - if !item.verify() { + if item.is_consistent() == false { warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item); self.state = State::Done; break Ready(Some(Err(Error::BadBatch))); diff --git a/crates/scyllaconn/src/events2/mergertchained.rs b/crates/scyllaconn/src/events2/mergertchained.rs index fbc6441..1793cf4 100644 --- a/crates/scyllaconn/src/events2/mergertchained.rs +++ b/crates/scyllaconn/src/events2/mergertchained.rs @@ -362,8 +362,9 @@ impl Stream for MergeRtsChained { self.out.push_back(x); } if let Some(item) = self.out.pop_front() { - trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item); - if items_0::Events::verify(&item) != true { + let verified = item.is_consistent(); + trace_emit!("emit item {} {:?}", verified, item); + if verified == false { debug!("{}bad item {:?}", "\n\n--------------------------\n", item); self.state = State::Done; } diff --git a/crates/scyllaconn/src/events2/onebeforeandbulk.rs b/crates/scyllaconn/src/events2/onebeforeandbulk.rs index 27fe4f4..8c9c0ae 100644 --- a/crates/scyllaconn/src/events2/onebeforeandbulk.rs +++ b/crates/scyllaconn/src/events2/onebeforeandbulk.rs @@ -1,12 +1,10 @@ use daqbuf_err as err; use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableTy; -use items_0::Events; use netpod::log::*; use netpod::stream_impl_tracer::StreamImplTracer; use netpod::TsNano; @@ -38,7 +36,7 @@ macro_rules! tracer_loop_enter { #[allow(unused)] macro_rules! debug_fetch { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "EventsOneBeforeAndBulk")] pub enum Error { Unordered, @@ -123,7 +121,7 @@ where impl Stream for OneBeforeAndBulk where S: Stream> + Unpin, - T: Events + MergeableTy + Unpin, + T: MergeableTy + Unpin, E: std::error::Error + Send + 'static, { type Item = Result, Error>; @@ -149,7 +147,7 @@ where self.tslast = MergeableTy::ts_max(&item).unwrap(); } } - if item.verify() != true { + if item.is_consistent() == false { self.state = State::Done; let e = Error::Unordered; Ready(Some(Err(e))) @@ -165,14 +163,80 @@ where } } // Separate events into before and bulk - let tss = Events::tss(&item); - let pp = tss.partition_point(|&x| x < self.ts0.ns()); - trace_transition!("partition_point {pp:?} {n:?}", n = tss.len()); - if pp > item.len() { - error!("bad partition point {} {}", pp, item.len()); - self.state = State::Done; - Ready(Some(Err(Error::Logic))) - } else if pp == item.len() { + let ppp = MergeableTy::find_lowest_index_ge(&item, self.ts0); + trace_transition!("partition_point {ppp:?} {n:?}", n = item.len()); + if let Some(pp) = ppp { + if pp == 0 { + // all entries are bulk + trace_transition!("transition with bulk to Bulk"); + self.state = State::Bulk; + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } + } else { + // mixed + trace_transition!("transition with mixed to Bulk"); + self.state = State::Bulk; + match self.buf.as_mut() { + Some(buf) => match item.drain_into(buf, 0..pp) { + DrainIntoDstResult::Done => { + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!( + "State::Begin Before {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!( + "State::Begin Bulk {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } + } + DrainIntoDstResult::Partial => panic!(), + DrainIntoDstResult::NotCompatible => panic!(), + }, + None => match item.drain_into_new(0..pp) { + DrainIntoNewResult::Done(buf) => { + self.buf = Some(buf); + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!( + "State::Begin Before {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!( + "State::Begin Bulk {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } + } + DrainIntoNewResult::Partial(_) => panic!(), + DrainIntoNewResult::NotCompatible => panic!(), + }, + } + } + } else { // all entries are before, or empty item trace_transition!("stay in Begin"); trace_emit!( @@ -195,75 +259,6 @@ where DrainIntoNewResult::NotCompatible => panic!(), }, } - } else if pp == 0 { - // all entries are bulk - trace_transition!("transition with bulk to Bulk"); - self.state = State::Bulk; - if let Some(before) = self.consume_buf_get_latest() { - self.out.push_back(item); - let item = Output::Before(before); - trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); - Ready(Some(Ok(item))) - } else { - let item = Output::Bulk(item); - trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); - Ready(Some(Ok(item))) - } - } else { - // mixed - trace_transition!("transition with mixed to Bulk"); - self.state = State::Bulk; - match self.buf.as_mut() { - Some(buf) => match item.drain_into(buf, 0..pp) { - DrainIntoDstResult::Done => { - if let Some(before) = self.consume_buf_get_latest() { - self.out.push_back(item); - let item = Output::Before(before); - trace_emit!( - "State::Begin Before {} emit {:?}", - self.dbgname, - item - ); - Ready(Some(Ok(item))) - } else { - let item = Output::Bulk(item); - trace_emit!( - "State::Begin Bulk {} emit {:?}", - self.dbgname, - item - ); - Ready(Some(Ok(item))) - } - } - DrainIntoDstResult::Partial => panic!(), - DrainIntoDstResult::NotCompatible => panic!(), - }, - None => match item.drain_into_new(0..pp) { - DrainIntoNewResult::Done(buf) => { - self.buf = Some(buf); - if let Some(before) = self.consume_buf_get_latest() { - self.out.push_back(item); - let item = Output::Before(before); - trace_emit!( - "State::Begin Before {} emit {:?}", - self.dbgname, - item - ); - Ready(Some(Ok(item))) - } else { - let item = Output::Bulk(item); - trace_emit!( - "State::Begin Bulk {} emit {:?}", - self.dbgname, - item - ); - Ready(Some(Ok(item))) - } - } - DrainIntoNewResult::Partial(_) => panic!(), - DrainIntoNewResult::NotCompatible => panic!(), - }, - } } } } @@ -304,7 +299,7 @@ where self.tslast = MergeableTy::ts_max(&item).unwrap(); } } - if item.verify() != true { + if item.is_consistent() == false { self.state = State::Done; let e = Error::Unordered; Ready(Some(Err(e)))