From 3ded7c6136b5d006197ab919b505e230c9046810 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 20 Dec 2022 16:36:42 +0100 Subject: [PATCH] Run on cryo --- dbconn/src/search.rs | 2 +- httpret/src/api4/binned.rs | 3 +- nodenet/src/conn.rs | 2 +- scyllaconn/src/events.rs | 342 ++++++++++++++++++---------------- streams/src/collect.rs | 4 +- streams/src/timebinnedjson.rs | 2 +- 6 files changed, 185 insertions(+), 170 deletions(-) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index e40fb93..3d2e029 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -100,7 +100,7 @@ pub async fn search_channel_scylla( " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", " where channel ~* $1", - " limit 100", + " limit 400000", )); let pgclient = crate::create_connection(pgconf).await?; let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 41bd921..9db0347 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -15,7 +15,7 @@ use tracing::Instrument; use url::Url; async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_json req: {:?}", req); + debug!("httpret plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { let msg = format!("can not parse query: {}", e.msg()); @@ -46,7 +46,6 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache } async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("req: {:?}", req); let accept = req .headers() .get(http::header::ACCEPT) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 29b7367..103d7b7 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -252,7 +252,7 @@ async fn events_conn_handler_inner_try( let item = item.make_frame(); match item { Ok(buf) => { - info!("write {} bytes", buf.len()); + trace!("write {} bytes", buf.len()); buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { Ok(_) => {} diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index cf39829..20ad709 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -1,23 +1,44 @@ use crate::errconv::ErrConv; use err::Error; -use futures_util::{Future, FutureExt, Stream, StreamExt}; -use items_0::{Empty, Events, WithLen}; -use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent}; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Empty; +use items_0::Events; +use items_0::WithLen; +use items_2::channelevents::ChannelEvents; +use items_2::channelevents::ConnStatus; +use items_2::channelevents::ConnStatusEvent; use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::query::ChannelStateEventsQuery; use netpod::timeunits::*; -use netpod::{NanoRange, ScalarType, Shape}; +use netpod::NanoRange; +use netpod::ScalarType; +use netpod::Shape; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; -async fn find_ts_msp(series: u64, range: NanoRange, scy: Arc) -> Result, Error> { - info!("find_ts_msp series {} {:?}", series, range); - let mut ret = VecDeque::new(); +async fn find_ts_msp( + series: u64, + range: NanoRange, + scy: Arc, +) -> Result<(VecDeque, VecDeque), Error> { + trace!("find_ts_msp series {} {:?}", series, range); + let mut ret1 = VecDeque::new(); + let mut ret2 = VecDeque::new(); // TODO use prepared statements + let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; + let res = scy.query(cql, (series as i64, range.beg as i64)).await.err_conv()?; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret1.push_front(row.0 as u64); + } let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; let res = scy .query(cql, (series as i64, range.beg as i64, range.end as i64)) @@ -25,22 +46,16 @@ async fn find_ts_msp(series: u64, range: NanoRange, scy: Arc) -> Res .err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret.push_back(row.0 as u64); + ret2.push_back(row.0 as u64); } let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1"; let res = scy.query(cql, (series as i64, range.end as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret.push_back(row.0 as u64); + ret2.push_back(row.0 as u64); } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series as i64, range.beg as i64)).await.err_conv()?; - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - ret.push_front(row.0 as u64); - } - trace!("found in total {} rows", ret.len()); - Ok(ret) + trace!("find_ts_msp n1 {} n2 {}", ret1.len(), ret2.len()); + Ok((ret1, ret2)) } macro_rules! read_next_scalar_values { @@ -50,7 +65,6 @@ macro_rules! read_next_scalar_values { ts_msp: u64, range: NanoRange, fwd: bool, - do_one_before: bool, scy: Arc, ) -> Result, Error> { type ST = $st; @@ -64,7 +78,7 @@ macro_rules! read_next_scalar_values { if range.end > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } - let res = if fwd { + let ret = if fwd { let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; trace!( @@ -74,7 +88,7 @@ macro_rules! read_next_scalar_values { ts_lsp_max, range.beg, range.end, - stringify!($fname) + stringify!($table_name) ); // TODO use prepared! let cql = concat!( @@ -82,18 +96,37 @@ macro_rules! read_next_scalar_values { $table_name, " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) .await - .err_conv()? + .err_conv()?; + let mut last_before = None; + let mut ret = EventsDim0::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + if ts >= range.end { + } else if ts >= range.beg { + ret.push(ts, pulse, value); + } else { + if last_before.is_none() { + warn!("encounter event before range in forward read {ts}"); + } + last_before = Some((ts, pulse, value)); + } + } + ret } else { let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - info!( + trace!( "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", ts_msp, ts_lsp_max, range.beg, range.end, - stringify!($fname) + stringify!($table_name) ); // TODO use prepared! let cql = concat!( @@ -101,30 +134,29 @@ macro_rules! read_next_scalar_values { $table_name, " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) .await - .err_conv()? + .err_conv()?; + let mut seen_before = false; + let mut ret = EventsDim0::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + if ts >= range.end { + } else if ts < range.beg { + ret.push(ts, pulse, value); + } else { + if !seen_before { + warn!("encounter event before range in forward read {ts}"); + } + seen_before = true; + } + } + ret }; - let mut last_before = None; - let mut ret = EventsDim0::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - if ts >= range.end { - } else if ts >= range.beg { - ret.push(ts, pulse, value); - } else { - last_before = Some((ts, pulse, value)); - } - } - if do_one_before { - if let Some((ts, pulse, value)) = last_before { - info!("PREPENDING THE LAST BEFORE {ts} {value:?}"); - ret.push_front(ts, pulse, value); - } - } trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) } @@ -138,7 +170,6 @@ macro_rules! read_next_array_values { ts_msp: u64, _range: NanoRange, _fwd: bool, - _do_one_before: bool, scy: Arc, ) -> Result, Error> { // TODO change return type: so far EventsDim1 does not exist. @@ -149,7 +180,7 @@ macro_rules! read_next_array_values { } type ST = $st; type _SCYTY = $scyty; - info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); let cql = concat!( "select ts_lsp, pulse, value from ", $table_name, @@ -172,9 +203,14 @@ macro_rules! read_next_array_values { }; } +read_next_scalar_values!(read_next_values_scalar_u8, u8, i8, "events_scalar_u8"); +read_next_scalar_values!(read_next_values_scalar_u16, u16, i16, "events_scalar_u16"); +read_next_scalar_values!(read_next_values_scalar_u32, u32, i32, "events_scalar_u32"); +read_next_scalar_values!(read_next_values_scalar_u64, u64, i64, "events_scalar_u64"); read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); +read_next_scalar_values!(read_next_values_scalar_i64, i64, i64, "events_scalar_i64"); read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); @@ -182,14 +218,7 @@ read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16") macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname( - $self.series, - $ts_msp, - $self.range.clone(), - $self.fwd, - $self.do_one_before_range, - $self.scy.clone(), - ); + let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); let fut = fut.map(|x| match x { Ok(k) => { let self_name = std::any::type_name::(); @@ -211,7 +240,6 @@ struct ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, - do_one_before_range: bool, fut: Pin, Error>> + Send>>, scy: Arc, } @@ -224,7 +252,6 @@ impl ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, - do_one_before_range: bool, scy: Arc, ) -> Self { let mut ret = Self { @@ -234,7 +261,6 @@ impl ReadValues { range, ts_msps, fwd, - do_one_before_range, fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( "future not initialized", )))), @@ -246,20 +272,28 @@ impl ReadValues { fn next(&mut self) -> bool { if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 0); + self.fut = self.make_fut(ts_msp); true } else { false } } - fn make_fut( - &mut self, - ts_msp: u64, - _has_more_msp: bool, - ) -> Pin, Error>> + Send>> { + fn make_fut(&mut self, ts_msp: u64) -> Pin, Error>> + Send>> { let fut = match &self.shape { Shape::Scalar => match &self.scalar_type { + ScalarType::U8 => { + read_values!(read_next_values_scalar_u8, self, ts_msp) + } + ScalarType::U16 => { + read_values!(read_next_values_scalar_u16, self, ts_msp) + } + ScalarType::U32 => { + read_values!(read_next_values_scalar_u32, self, ts_msp) + } + ScalarType::U64 => { + read_values!(read_next_values_scalar_u64, self, ts_msp) + } ScalarType::I8 => { read_values!(read_next_values_scalar_i8, self, ts_msp) } @@ -269,6 +303,9 @@ impl ReadValues { ScalarType::I32 => { read_values!(read_next_values_scalar_i32, self, ts_msp) } + ScalarType::I64 => { + read_values!(read_next_values_scalar_i64, self, ts_msp) + } ScalarType::F32 => { read_values!(read_next_values_scalar_f32, self, ts_msp) } @@ -300,7 +337,7 @@ impl ReadValues { enum FrState { New, - FindMsp(Pin, Error>> + Send>>), + FindMsp(Pin, VecDeque), Error>> + Send>>), ReadBack1(ReadValues), ReadBack2(ReadValues), ReadValues(ReadValues), @@ -313,10 +350,14 @@ pub struct EventsStreamScylla { scalar_type: ScalarType, shape: Shape, range: NanoRange, + #[allow(unused)] do_one_before_range: bool, + ts_msp_b1: Option, + ts_msp_b2: Option, ts_msps: VecDeque, scy: Arc, do_test_stream_error: bool, + outqueue: VecDeque>, } impl EventsStreamScylla { @@ -329,8 +370,6 @@ impl EventsStreamScylla { scy: Arc, do_test_stream_error: bool, ) -> Self { - let self_name = std::any::type_name::(); - info!("{self_name} do_one_before_range {do_one_before_range}"); Self { state: FrState::New, series, @@ -338,37 +377,41 @@ impl EventsStreamScylla { shape, range, do_one_before_range, + ts_msp_b1: None, + ts_msp_b2: None, ts_msps: VecDeque::new(), scy, do_test_stream_error, + outqueue: VecDeque::new(), } } - fn ts_msps_found(&mut self, ts_msps: VecDeque) { - info!("ts_msps_found ts_msps {ts_msps:?}"); - self.ts_msps = ts_msps; - // Find the largest MSP which can potentially contain some event before the range. - let befores: Vec<_> = self - .ts_msps - .iter() - .map(|x| *x) - .filter(|x| *x < self.range.beg) - .collect(); - if self.do_one_before_range && befores.len() >= 1 { - info!("Try ReadBack1"); + fn ts_msps_found(&mut self, msps1: VecDeque, msps2: VecDeque) { + trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}"); + let mut msps1 = msps1; + self.ts_msp_b1 = msps1.pop_back(); + self.ts_msp_b2 = msps1.pop_back(); + self.ts_msps = msps2; + if let Some(x) = self.ts_msp_b1.clone() { + self.ts_msps.push_front(x); + } + trace!("ts_msp_b1 {:?}", self.ts_msp_b1); + trace!("ts_msp_b2 {:?}", self.ts_msp_b2); + trace!("ts_msps {:?}", self.ts_msps); + if let Some(msp) = self.ts_msp_b1.clone() { + trace!("Try ReadBack1"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), - [befores[befores.len() - 1]].into(), + [msp].into(), false, - self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadBack1(st); } else if self.ts_msps.len() >= 1 { - info!("Go straight for forward read"); + trace!("Go straight for forward read"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), @@ -376,7 +419,6 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, - self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -385,51 +427,10 @@ impl EventsStreamScylla { } } - fn back_1_done(&mut self, item: Box) -> Option> { - info!("back_1_done len {}", item.len()); - if item.len() == 0 { - info!("ReadBack1 returned empty"); - // Find the 2nd largest MSP which can potentially contain some event before the range. - let befores: Vec<_> = self - .ts_msps - .iter() - .map(|x| *x) - .filter(|x| *x < self.range.beg) - .collect(); - if befores.len() >= 2 { - info!("Try ReadBack2"); - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - [befores[befores.len() - 2]].into(), - false, - self.do_one_before_range, - self.scy.clone(), - ); - self.state = FrState::ReadBack2(st); - None - } else if self.ts_msps.len() >= 1 { - info!("No 2nd back MSP, go for forward read"); - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - self.ts_msps.clone(), - true, - self.do_one_before_range, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - None - } else { - self.state = FrState::Done; - None - } - } else { - info!("FOUND ONE BEFORE"); + fn back_1_done(&mut self, item: Box) { + trace!("back_1_done item len {}", item.len()); + if item.len() > 0 { + self.outqueue.push_back(item); if self.ts_msps.len() > 0 { let st = ReadValues::new( self.series as i64, @@ -438,22 +439,47 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, - self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); - Some(item) } else { self.state = FrState::Done; - Some(item) + } + } else { + if let Some(msp) = self.ts_msp_b2.clone() { + trace!("Try ReadBack2"); + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + [msp].into(), + false, + self.scy.clone(), + ); + self.state = FrState::ReadBack2(st); + } else if self.ts_msps.len() >= 1 { + trace!("No 2nd back MSP, go for forward read"); + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; } } } - fn back_2_done(&mut self, item: Box) -> Option> { - info!("back_2_done len {}", item.len()); - if item.len() == 0 { - info!("ReadBack2 returned empty"); + fn back_2_done(&mut self, item: Box) { + trace!("back_1_done item len {}", item.len()); + if item.len() > 0 { + self.outqueue.push_back(item); } if self.ts_msps.len() >= 1 { let st = ReadValues::new( @@ -463,18 +489,12 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, - self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); } else { self.state = FrState::Done; } - if item.len() > 0 { - Some(item) - } else { - None - } } } @@ -489,6 +509,11 @@ impl Stream for EventsStreamScylla { return Ready(Some(Err(e))); } loop { + if let Some(item) = self.outqueue.pop_front() { + item.verify(); + item.output_info(); + break Ready(Some(Ok(ChannelEvents::Events(item)))); + } break match self.state { FrState::New => { let fut = find_ts_msp(self.series, self.range.clone(), self.scy.clone()); @@ -497,8 +522,8 @@ impl Stream for EventsStreamScylla { continue; } FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(ts_msps)) => { - self.ts_msps_found(ts_msps); + Ready(Ok((msps1, msps2))) => { + self.ts_msps_found(msps1, msps2); continue; } Ready(Err(e)) => { @@ -509,13 +534,8 @@ impl Stream for EventsStreamScylla { }, FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { - if let Some(item) = self.back_1_done(item) { - item.verify(); - item.output_info(); - Ready(Some(Ok(ChannelEvents::Events(item)))) - } else { - continue; - } + self.back_1_done(item); + continue; } Ready(Err(e)) => { self.state = FrState::Done; @@ -525,13 +545,8 @@ impl Stream for EventsStreamScylla { }, FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { - if let Some(item) = self.back_2_done(item) { - item.verify(); - item.output_info(); - Ready(Some(Ok(ChannelEvents::Events(item)))) - } else { - continue; - } + self.back_2_done(item); + continue; } Ready(Err(e)) => { self.state = FrState::Done; @@ -541,13 +556,14 @@ impl Stream for EventsStreamScylla { }, FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(item)) => { - item.verify(); - item.output_info(); if !st.next() { - info!("ReadValues exhausted"); + trace!("ReadValues exhausted"); self.state = FrState::Done; } - Ready(Some(Ok(ChannelEvents::Events(item)))) + if item.len() > 0 { + self.outqueue.push_back(item); + } + continue; } Ready(Err(e)) => Ready(Some(Err(e))), Pending => Pending, diff --git a/streams/src/collect.rs b/streams/src/collect.rs index f9810e7..cced8a2 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -57,7 +57,7 @@ where break; } }; - info!("collect_in_span see item"); + debug!("collect_in_span see item"); match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { @@ -70,7 +70,7 @@ where } } RangeCompletableItem::Data(mut item) => { - info!("collect_in_span sees {}", item.len()); + debug!("collect_in_span sees {}", item.len()); if collector.is_none() { let c = item.new_collector(); collector = Some(c); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index ad62074..a28f1b2 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -35,7 +35,7 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu ); let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: - let stream = { items_2::merger::Merger::new(inps, 1) }; + let stream = { items_2::merger::Merger::new(inps, 128) }; let stream = stream::iter([empty]).chain(stream); let stream = Box::pin(stream); let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);