diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 0fe67fb..3d4f20a 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -223,15 +223,6 @@ dependencies = [ "libc", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.3" @@ -706,22 +697,13 @@ dependencies = [ "tracing", ] -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - [[package]] name = "digest" version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ - "block-buffer 0.10.3", + "block-buffer", "crypto-common", "subtle", ] @@ -1163,7 +1145,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest 0.10.6", + "digest", ] [[package]] @@ -1241,12 +1223,11 @@ dependencies = [ "futures-util", "http", "hyper", - "hyper-tls", "items", "items_0", "items_2", "itertools", - "md-5 0.9.1", + "md-5", "netpod", "nodenet", "parse", @@ -1561,24 +1542,13 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" -[[package]] -name = "md-5" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" -dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "md-5" version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest 0.10.6", + "digest", ] [[package]] @@ -1810,12 +1780,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "openssl" version = "0.10.44" @@ -2018,7 +1982,7 @@ dependencies = [ "bytes", "fallible-iterator", "hmac", - "md-5 0.10.5", + "md-5", "memchr", "rand", "sha2", @@ -2283,11 +2247,12 @@ checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" [[package]] name = "scylla" -version = "0.5.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed40c02a5c835d894044bbd6f454ef54b6937ec8af23efe3cdeaa57005bb00d" +checksum = "345a33a39eb25c82e48f7e82cb7378ec724e72a48579bb9c83e1c511d5b44fb6" dependencies = [ "arc-swap", + "async-trait", "bigdecimal", "byteorder", "bytes", @@ -2314,9 +2279,9 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.0.1" +version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e0ac8202660385589dba16123cc5916cf4eb3078e2dac0ab6af6903d7f5467" +checksum = "f01015d74993239c1ad6fa89e4c0baed8af37efaaaf1fb9a5abecc119a7e31c6" dependencies = [ "bigdecimal", "byteorder", @@ -2448,7 +2413,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.6", + "digest", ] [[package]] diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 44362fe..3b33acf 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -22,7 +22,7 @@ bytes = "1.2" pin-project = "1" #async-channel = "1" #dashmap = "3" -scylla = "0.5" +scylla = "0.6.1" async-channel = "1.6" chrono = "0.4" regex = "1.5.4" diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs deleted file mode 100644 index 7b054b8..0000000 --- a/dbconn/src/bincache.rs +++ /dev/null @@ -1,470 +0,0 @@ -use crate::events_scylla::EventsStreamScylla; -use crate::ErrConv; -use err::Error; -use futures_util::{Future, Stream, StreamExt}; -use items::binsdim0::MinMaxAvgDim0Bins; -use items::{empty_binned_dyn, empty_events_dyn, RangeCompletableItem, StreamItem, TimeBinned}; -use netpod::log::*; -use netpod::query::{CacheUsage, PlainEventsQuery}; -use netpod::timeunits::*; -use netpod::{ - AggKind, ChannelTyped, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, ScyllaConfig, - Shape, -}; -use scylla::Session as ScySession; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; - -pub async fn read_cached_scylla( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoord, - _agg_kind: AggKind, - scy: &ScySession, -) -> Result>, Error> { - let vals = ( - series as i64, - (coord.bin_t_len() / SEC) as i32, - (coord.patch_t_len() / SEC) as i32, - coord.ix() as i64, - ); - let res = scy - .query_iter( - "select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?", - vals, - ) - .await; - let mut res = res.err_conv().map_err(|e| { - error!("can not read from cache"); - e - })?; - while let Some(item) = res.next().await { - let row = item.err_conv()?; - let edges = coord.edges(); - let (counts, avgs, mins, maxs): (Vec, Vec, Vec, Vec) = row.into_typed().err_conv()?; - let mut counts_mismatch = false; - if edges.len() != counts.len() + 1 { - counts_mismatch = true; - } - if counts.len() != avgs.len() { - counts_mismatch = true; - } - let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec(); - let ts2s = edges[1.min(edges.len())..].to_vec(); - if ts1s.len() != ts2s.len() { - error!("ts1s vs ts2s mismatch"); - counts_mismatch = true; - } - if ts1s.len() != counts.len() { - counts_mismatch = true; - } - let avgs = avgs.into_iter().map(|x| x).collect::>(); - let mins = mins.into_iter().map(|x| x as _).collect::>(); - let maxs = maxs.into_iter().map(|x| x as _).collect::>(); - if counts_mismatch { - error!( - "mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}", - edges.len(), - ts1s.len(), - ts2s.len(), - counts.len(), - avgs.len(), - mins.len(), - maxs.len(), - ); - } - let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect(); - // TODO construct a dyn TimeBinned using the scalar type and shape information. - // TODO place the values with little copying into the TimeBinned. - use ScalarType::*; - use Shape::*; - match &chn.shape { - Scalar => match &chn.scalar_type { - F64 => { - let ret = MinMaxAvgDim0Bins:: { - ts1s, - ts2s, - counts, - avgs, - mins, - maxs, - }; - return Ok(Some(Box::new(ret))); - } - _ => { - error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); - err::todoval() - } - }, - _ => { - error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); - err::todoval() - } - } - } - Ok(None) -} - -#[allow(unused)] -struct WriteFut<'a> { - chn: &'a ChannelTyped, - coord: &'a PreBinnedPatchCoord, - data: &'a dyn TimeBinned, - scy: &'a ScySession, -} - -impl<'a> WriteFut<'a> { - fn new( - chn: &'a ChannelTyped, - coord: &'a PreBinnedPatchCoord, - data: &'a dyn TimeBinned, - scy: &'a ScySession, - ) -> Self { - Self { chn, coord, data, scy } - } -} - -impl<'a> Future for WriteFut<'a> { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let _ = cx; - Poll::Ready(Ok(())) - } -} - -pub fn write_cached_scylla<'a>( - series: u64, - chn: &ChannelTyped, - coord: &'a PreBinnedPatchCoord, - data: &'a dyn TimeBinned, - scy: &ScySession, -) -> Pin> + Send + 'a>> { - let _chn = unsafe { &*(chn as *const ChannelTyped) }; - let data = unsafe { &*(data as *const dyn TimeBinned) }; - let scy = unsafe { &*(scy as *const ScySession) }; - let fut = async move { - let bin_len_sec = (coord.bin_t_len() / SEC) as i32; - let patch_len_sec = (coord.patch_t_len() / SEC) as i32; - let offset = coord.ix(); - warn!( - "write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}", - data.counts().len(), - series, - bin_len_sec, - patch_len_sec, - offset, - ); - let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?; - scy.execute( - &stmt, - ( - series as i64, - bin_len_sec, - patch_len_sec, - offset as i64, - data.counts().iter().map(|x| *x as i64).collect::>(), - data.avgs(), - data.mins(), - data.maxs(), - ), - ) - .await - .err_conv() - .map_err(|e| { - error!("can not write to cache"); - e - })?; - Ok(()) - }; - Box::pin(fut) -} - -pub async fn fetch_uncached_data( - series: u64, - chn: ChannelTyped, - coord: PreBinnedPatchCoord, - agg_kind: AggKind, - cache_usage: CacheUsage, - scy: Arc, -) -> Result, bool)>, Error> { - info!("fetch_uncached_data {coord:?}"); - // Try to find a higher resolution pre-binned grid which covers the requested patch. - let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) { - Ok(Some(range)) => { - if coord.patch_range() != range.range() { - error!( - "The chosen covering range does not exactly cover the requested patch {:?} vs {:?}", - coord.patch_range(), - range.range() - ); - } - fetch_uncached_higher_res_prebinned( - series, - &chn, - coord.clone(), - range, - agg_kind, - cache_usage.clone(), - scy.clone(), - ) - .await - } - Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await, - Err(e) => Err(e), - }?; - if true || complete { - let edges = coord.edges(); - if edges.len() < bin.len() + 1 { - error!( - "attempt to write overfull bin to cache edges {} bin {}", - edges.len(), - bin.len() - ); - return Err(Error::with_msg_no_trace(format!( - "attempt to write overfull bin to cache" - ))); - } else if edges.len() > bin.len() + 1 { - let missing = edges.len() - bin.len() - 1; - error!("attempt to write incomplete bin to cache missing {missing}"); - } - if let CacheUsage::Use | CacheUsage::Recreate = &cache_usage { - WriteFut::new(&chn, &coord, bin.as_ref(), &scy).await?; - write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?; - } - } - Ok(Some((bin, complete))) -} - -pub fn fetch_uncached_data_box( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoord, - agg_kind: AggKind, - cache_usage: CacheUsage, - scy: Arc, -) -> Pin, bool)>, Error>> + Send>> { - Box::pin(fetch_uncached_data( - series, - chn.clone(), - coord.clone(), - agg_kind, - cache_usage, - scy, - )) -} - -pub async fn fetch_uncached_higher_res_prebinned( - series: u64, - chn: &ChannelTyped, - coord: PreBinnedPatchCoord, - range: PreBinnedPatchRange, - agg_kind: AggKind, - cache_usage: CacheUsage, - scy: Arc, -) -> Result<(Box, bool), Error> { - let edges = coord.edges(); - // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. - let do_time_weight = true; - // We must produce some result with correct types even if upstream delivers nothing at all. - let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind); - let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); - let mut complete = true; - let patch_it = PreBinnedPatchIterator::from_range(range.clone()); - for patch_coord in patch_it { - // We request data here for a Coord, meaning that we expect to receive multiple bins. - // The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord. - //let patch_coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); - let (bin, comp) = pre_binned_value_stream_with_scy( - series, - chn, - &patch_coord, - agg_kind.clone(), - cache_usage.clone(), - scy.clone(), - ) - .await?; - if let Err(msg) = bin.validate() { - error!( - "pre-binned intermediate issue {} coord {:?} patch_coord {:?}", - msg, coord, patch_coord - ); - } - complete = complete && comp; - time_binner.ingest(bin.as_time_binnable_dyn()); - } - // Fixed limit to defend against a malformed implementation: - let mut i = 0; - while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { - let n1 = time_binner.bins_ready_count(); - if false { - trace!( - "pre-binned extra cycle {} {} {}", - i, - time_binner.bins_ready_count(), - coord.bin_count() - ); - } - time_binner.cycle(); - i += 1; - if time_binner.bins_ready_count() <= n1 { - warn!("pre-binned cycle did not add another bin, break"); - break; - } - } - if time_binner.bins_ready_count() < coord.bin_count() as usize { - return Err(Error::with_msg_no_trace(format!( - "pre-binned unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", - time_binner.bins_ready_count(), - coord.bin_count(), - edges.len(), - ))); - } - let ready = time_binner - .bins_ready() - .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?; - if let Err(msg) = ready.validate() { - error!("pre-binned final issue {} coord {:?}", msg, coord); - } - Ok((ready, complete)) -} - -pub async fn fetch_uncached_binned_events( - series: u64, - chn: &ChannelTyped, - coord: PreBinnedPatchCoord, - agg_kind: AggKind, - scy: Arc, -) -> Result<(Box, bool), Error> { - let edges = coord.edges(); - // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. - let do_time_weight = true; - // We must produce some result with correct types even if upstream delivers nothing at all. - let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); - let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); - let deadline = Instant::now(); - let deadline = deadline - .checked_add(Duration::from_millis(6000)) - .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; - let evq = PlainEventsQuery::new( - chn.channel.clone(), - coord.patch_range(), - agg_kind, - Duration::from_millis(6000), - None, - true, - ); - let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); - let mut complete = false; - loop { - let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await; - let item = match item { - Ok(Some(k)) => k, - Ok(None) => break, - Err(_) => { - error!("fetch_uncached_binned_events timeout"); - return Err(Error::with_msg_no_trace(format!( - "TODO handle fetch_uncached_binned_events timeout" - ))); - } - }; - match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { - time_binner.ingest(item.as_time_binnable_dyn()); - // TODO could also ask the binner here whether we are "complete" to stop sending useless data. - } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { - complete = true; - } - Ok(StreamItem::Stats(_item)) => { - warn!("TODO forward in stream bincache stats"); - } - Ok(StreamItem::Log(item)) => { - warn!("TODO forward in stream bincache log msg {}", item.msg); - } - Err(e) => return Err(e), - } - } - // Fixed limit to defend against a malformed implementation: - let mut i = 0; - while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { - let n1 = time_binner.bins_ready_count(); - if false { - trace!( - "events extra cycle {} {} {}", - i, - time_binner.bins_ready_count(), - coord.bin_count() - ); - } - time_binner.cycle(); - i += 1; - if time_binner.bins_ready_count() <= n1 { - warn!("events cycle did not add another bin, break"); - break; - } - } - if time_binner.bins_ready_count() < coord.bin_count() as usize { - return Err(Error::with_msg_no_trace(format!( - "events unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", - time_binner.bins_ready_count(), - coord.bin_count(), - edges.len(), - ))); - } - let ready = time_binner - .bins_ready() - .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?; - if let Err(msg) = ready.validate() { - error!("time binned invalid {} coord {:?}", msg, coord); - } - Ok((ready, complete)) -} - -pub async fn pre_binned_value_stream_with_scy( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoord, - agg_kind: AggKind, - cache_usage: CacheUsage, - scy: Arc, -) -> Result<(Box, bool), Error> { - trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); - if let (Some(item), CacheUsage::Use) = ( - read_cached_scylla(series, chn, coord, agg_kind.clone(), &scy).await?, - &cache_usage, - ) { - info!("+++++++++++++ GOOD READ"); - Ok((item, true)) - } else { - if let CacheUsage::Use = &cache_usage { - warn!("--+--+--+--+--+--+ NOT YET CACHED"); - } - let res = fetch_uncached_data_box(series, chn, coord, agg_kind, cache_usage, scy).await?; - let (bin, complete) = - res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?; - Ok((bin, complete)) - } -} - -pub async fn pre_binned_value_stream( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoord, - agg_kind: AggKind, - cache_usage: CacheUsage, - scyconf: &ScyllaConfig, -) -> Result, Error>> + Send>>, Error> { - trace!("pre_binned_value_stream series {series} {chn:?} {coord:?} {scyconf:?}"); - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .build() - .await - .err_conv()?; - let scy = Arc::new(scy); - let res = pre_binned_value_stream_with_scy(series, chn, coord, agg_kind, cache_usage, scy).await?; - Ok(Box::pin(futures_util::stream::iter([Ok(res.0)]))) -} diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 69bd27e..869253e 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -1,18 +1,15 @@ -pub mod bincache; -pub mod events_scylla; +pub mod channelconfig; pub mod scan; pub mod search; + pub mod pg { pub use tokio_postgres::{Client, Error}; } -pub mod channelconfig; use err::Error; -use netpod::{log::*, ScalarType, Shape}; -use netpod::{Channel, Database, NodeConfigCached, ScyllaConfig}; -use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; -use scylla::Session as ScySession; +use netpod::log::*; +use netpod::{Channel, Database, NodeConfigCached}; +use netpod::{ScalarType, Shape}; use std::sync::Arc; use std::time::Duration; use tokio_postgres::{Client, Client as PgClient, NoTls}; @@ -38,32 +35,6 @@ impl ErrConv for Result> { } } } -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} pub async fn delay_us(mu: u64) { tokio::time::sleep(Duration::from_micros(mu)).await; @@ -95,16 +66,6 @@ pub async fn create_connection(db_config: &Database) -> Result { Ok(cl) } -pub async fn create_scylla_connection(scyconf: &ScyllaConfig) -> Result { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .build() - .await - .err_conv()?; - Ok(scy) -} - pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result { let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs deleted file mode 100644 index 52fc8ff..0000000 --- a/dbconn/src/events_scylla.rs +++ /dev/null @@ -1,518 +0,0 @@ -use crate::ErrConv; -use err::Error; -use futures_util::{Future, FutureExt, Stream, StreamExt}; -use items::scalarevents::ScalarEvents; -use items::waveevents::WaveEvents; -use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; -use netpod::log::*; -use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; -use netpod::timeunits::DAY; -use netpod::{NanoRange, ScalarType, ScyllaConfig, Shape}; -use scylla::Session as ScySession; -use std::collections::VecDeque; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -macro_rules! read_values { - ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); - let fut = fut.map(|x| { - match x { - Ok(k) => { - // TODO why static needed? - let b = Box::new(k) as Box; - Ok(b) - } - Err(e) => Err(e), - } - }); - let fut = Box::pin(fut) as Pin, Error>> + Send>>; - fut - }}; -} - -struct ReadValues { - series: i64, - scalar_type: ScalarType, - shape: Shape, - range: NanoRange, - ts_msps: VecDeque, - fwd: bool, - fut: Pin, Error>> + Send>>, - scy: Arc, -} - -impl ReadValues { - fn new( - series: i64, - scalar_type: ScalarType, - shape: Shape, - range: NanoRange, - ts_msps: VecDeque, - fwd: bool, - scy: Arc, - ) -> Self { - let mut ret = Self { - series, - scalar_type, - shape, - range, - ts_msps, - fwd, - fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( - "future not initialized", - )))), - scy, - }; - ret.next(); - ret - } - - fn next(&mut self) -> bool { - if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 1); - true - } else { - false - } - } - - fn make_fut( - &mut self, - ts_msp: u64, - _has_more_msp: bool, - ) -> Pin, Error>> + Send>> { - let fut = match &self.shape { - Shape::Scalar => match &self.scalar_type { - ScalarType::I8 => { - read_values!(read_next_values_scalar_i8, self, ts_msp) - } - ScalarType::I16 => { - read_values!(read_next_values_scalar_i16, self, ts_msp) - } - ScalarType::I32 => { - read_values!(read_next_values_scalar_i32, self, ts_msp) - } - ScalarType::F32 => { - read_values!(read_next_values_scalar_f32, self, ts_msp) - } - ScalarType::F64 => { - read_values!(read_next_values_scalar_f64, self, ts_msp) - } - _ => err::todoval(), - }, - Shape::Wave(_) => match &self.scalar_type { - ScalarType::U16 => { - read_values!(read_next_values_array_u16, self, ts_msp) - } - _ => err::todoval(), - }, - _ => err::todoval(), - }; - fut - } -} - -enum FrState { - New, - FindMsp(Pin, Error>> + Send>>), - ReadBack1(ReadValues), - ReadBack2(ReadValues), - ReadValues(ReadValues), - Done, -} - -pub struct EventsStreamScylla { - state: FrState, - series: u64, - scalar_type: ScalarType, - shape: Shape, - range: NanoRange, - ts_msps: VecDeque, - scy: Arc, - do_test_stream_error: bool, -} - -impl EventsStreamScylla { - pub fn new( - series: u64, - evq: &PlainEventsQuery, - scalar_type: ScalarType, - shape: Shape, - scy: Arc, - do_test_stream_error: bool, - ) -> Self { - Self { - state: FrState::New, - series, - scalar_type, - shape, - range: evq.range().clone(), - ts_msps: VecDeque::new(), - scy, - do_test_stream_error, - } - } - - fn ts_msps_found(&mut self, ts_msps: VecDeque) { - info!("found ts_msps {ts_msps:?}"); - self.ts_msps = ts_msps; - // Find the largest MSP which can potentially contain some event before the range. - let befores: Vec<_> = self - .ts_msps - .iter() - .map(|x| *x) - .filter(|x| *x < self.range.beg) - .collect(); - if befores.len() >= 1 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - [befores[befores.len() - 1]].into(), - false, - self.scy.clone(), - ); - self.state = FrState::ReadBack1(st); - } else if self.ts_msps.len() >= 1 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - self.ts_msps.clone(), - true, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - } else { - self.state = FrState::Done; - } - } - - fn back_1_done(&mut self, item: Box) -> Option> { - info!("back_1_done len {}", item.len()); - if item.len() == 0 { - // Find the 2nd largest MSP which can potentially contain some event before the range. - let befores: Vec<_> = self - .ts_msps - .iter() - .map(|x| *x) - .filter(|x| *x < self.range.beg) - .collect(); - if befores.len() >= 2 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - [befores[befores.len() - 2]].into(), - false, - self.scy.clone(), - ); - self.state = FrState::ReadBack2(st); - None - } else if self.ts_msps.len() >= 1 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - self.ts_msps.clone(), - true, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - None - } else { - self.state = FrState::Done; - None - } - } else { - if self.ts_msps.len() > 0 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - self.ts_msps.clone(), - true, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - Some(item) - } else { - self.state = FrState::Done; - Some(item) - } - } - } - - fn back_2_done(&mut self, item: Box) -> Option> { - info!("back_2_done len {}", item.len()); - if self.ts_msps.len() >= 1 { - let st = ReadValues::new( - self.series as i64, - self.scalar_type.clone(), - self.shape.clone(), - self.range.clone(), - self.ts_msps.clone(), - true, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - } else { - self.state = FrState::Done; - } - if item.len() > 0 { - Some(item) - } else { - None - } - } -} - -impl Stream for EventsStreamScylla { - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.do_test_stream_error { - let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) - .add_public_msg(format!("Test PUBLIC STREAM error.")); - return Ready(Some(Err(e))); - } - loop { - break match self.state { - FrState::New => { - let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); - let fut = Box::pin(fut); - self.state = FrState::FindMsp(fut); - continue; - } - FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(ts_msps)) => { - self.ts_msps_found(ts_msps); - continue; - } - Ready(Err(e)) => { - self.state = FrState::Done; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - if let Some(item) = self.back_1_done(item) { - item.verify(); - item.output_info(); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } else { - continue; - } - } - Ready(Err(e)) => { - self.state = FrState::Done; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - if let Some(item) = self.back_2_done(item) { - item.verify(); - item.output_info(); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } else { - continue; - } - } - Ready(Err(e)) => { - self.state = FrState::Done; - Ready(Some(Err(e))) - } - Pending => Pending, - }, - FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(Ok(item)) => { - info!("read values"); - item.verify(); - item.output_info(); - if !st.next() { - info!("ReadValues exhausted"); - self.state = FrState::Done; - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } - Ready(Err(e)) => Ready(Some(Err(e))), - Pending => Pending, - }, - FrState::Done => Ready(None), - }; - } - } -} - -async fn find_ts_msp(_series: i64, _range: NanoRange, _scy: Arc) -> Result, Error> { - // TODO remove - panic!() -} - -macro_rules! read_next_scalar_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - range: NanoRange, - fwd: bool, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - if ts_msp >= range.end { - warn!("given ts_msp {} >= range.end {}", ts_msp, range.end); - } - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); - } - let res = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; - trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", - ts_msp, - ts_lsp_min, - ts_lsp_max, - stringify!($fname) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) - .await - .err_conv()? - } else { - let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - info!( - "BCK ts_msp {} ts_lsp_max {} range beg {} end {} {}", - ts_msp, - ts_lsp_max, - range.beg, - range.end, - stringify!($fname) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()? - }; - let mut ret = ScalarEvents::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - ret.push(ts, pulse, value); - } - trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) - } - }; -} - -macro_rules! read_next_array_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - _range: NanoRange, - _fwd: bool, - scy: Arc, - ) -> Result, Error> { - if true { - return Err(Error::with_msg_no_trace("redo based on scalar case")); - } - type ST = $st; - type SCYTY = $scyty; - info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ?" - ); - let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; - let mut ret = WaveEvents::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - ret.push(ts, pulse, value); - } - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) - } - }; -} - -read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); -read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); -read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); -read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); -read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); - -read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); - -pub async fn channel_state_events( - evq: &ChannelStateEventsQuery, - scyco: &ScyllaConfig, -) -> Result, Error> { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) - .build() - .await - .err_conv()?; - let scy = Arc::new(scy); - let mut ret = Vec::new(); - let div = DAY; - let mut ts_msp = evq.range().beg / div * div; - loop { - let series = (evq - .channel() - .series() - .ok_or(Error::with_msg_no_trace(format!("series id not given"))))?; - let params = (series as i64, ts_msp as i64); - let mut res = scy - .query_iter( - "select ts_lsp, kind from channel_status where series = ? and ts_msp = ?", - params, - ) - .await - .err_conv()?; - while let Some(row) = res.next().await { - let row = row.err_conv()?; - let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?; - let ts = ts_msp + ts_lsp as u64; - let kind = kind as u32; - if ts >= evq.range().beg && ts < evq.range().end { - ret.push((ts, kind)); - } - } - ts_msp += div; - if ts_msp >= evq.range().end { - break; - } - } - Ok(ret) -} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 488da83..3c4e6ff 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -15,8 +15,8 @@ pub async fn gen_test_data() -> Result<(), Error> { let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer"); let ksprefix = String::from("ks"); let mut ensemble = Ensemble { - nodes: vec![], - channels: vec![], + nodes: Vec::new(), + channels: Vec::new(), }; { let chn = ChannelGenProps { diff --git a/err/src/lib.rs b/err/src/lib.rs index 4899322..0cc8728 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -89,7 +89,7 @@ impl Error { pub fn add_public_msg(mut self, msg: impl Into) -> Self { if self.public_msg.is_none() { - self.public_msg = Some(vec![]); + self.public_msg = Some(Vec::new()); } self.public_msg.as_mut().unwrap().push(msg.into()); self @@ -120,7 +120,7 @@ impl Error { fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { use std::io::Write; - let mut buf = vec![]; + let mut buf = Vec::new(); let mut c1 = 0; 'outer: for fr in trace.frames() { for sy in fr.symbols() { diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 9ee80ae..64ed8b2 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -8,24 +8,26 @@ edition = "2021" path = "src/httpret.rs" [dependencies] -serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" http = "0.2" url = "2.2" -tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } -hyper-tls = { version="0.5.0" } -bytes = "1.0.1" +tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +bytes = "1.3.0" futures-util = "0.3.14" tracing = "0.1" tracing-futures = "0.2" async-channel = "1.6" itertools = "0.10.1" -chrono = "0.4.19" +chrono = "0.4.23" +scylla = "0.6.1" +md-5 = "0.10" +regex = "1.7" err = { path = "../err" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } -tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } disk = { path = "../disk" } items = { path = "../items" } items_0 = { path = "../items_0" } @@ -36,6 +38,3 @@ nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } scyllaconn = { path = "../scyllaconn" } -scylla = "0.5" -md-5 = "0.9" -regex = "1.6" diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index b94d0ed..1a73d78 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -1,8 +1,10 @@ use crate::bodystream::response; use crate::err::Error; use crate::ReqCtx; +use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; use hyper::Body; +use items_2::channelevents::ConnStatusEvent; use netpod::query::ChannelStateEventsQuery; use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON}; use url::Url; @@ -53,23 +55,29 @@ impl ConnectionStatusEvents { &self, q: &ChannelStateEventsQuery, node_config: &NodeConfigCached, - ) -> Result, Error> { + ) -> Result, Error> { let scyco = node_config .node_config .cluster .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let ret = dbconn::events_scylla::channel_state_events(q, scyco).await?; + let scy = scyllaconn::create_scy_session(scyco).await?; + let mut stream = scyllaconn::events::channel_state_events(q, scy).await?; + let mut ret = Vec::new(); + while let Some(item) = stream.next().await { + let item = item?; + ret.push(item); + } Ok(ret) } } -pub struct ChannelConnectionStatusEvents {} +pub struct ChannelStatusEvents {} -impl ChannelConnectionStatusEvents { +impl ChannelStatusEvents { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/channel/connection/status/events" { + if req.uri().path() == "/api/4/scylla/channel/status/events" { Some(Self {}) } else { None @@ -111,14 +119,20 @@ impl ChannelConnectionStatusEvents { &self, q: &ChannelStateEventsQuery, node_config: &NodeConfigCached, - ) -> Result, Error> { + ) -> Result, Error> { let scyco = node_config .node_config .cluster .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let ret = dbconn::events_scylla::channel_state_events(q, scyco).await?; + let scy = scyllaconn::create_scy_session(scyco).await?; + let mut stream = scyllaconn::events::channel_state_events(q, scy).await?; + let mut ret = Vec::new(); + while let Some(item) = stream.next().await { + let item = item?; + ret.push(item); + } Ok(ret) } } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 465014b..f39a4fc 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,15 +1,17 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; -use dbconn::channelconfig::{chconf_from_database, ChConf}; -use dbconn::{create_connection, create_scylla_connection}; +use dbconn::channelconfig::chconf_from_database; +use dbconn::channelconfig::ChConf; +use dbconn::create_connection; use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; use hyper::Body; +use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::timeunits::*; -use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; +use netpod::{Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; use scylla::batch::Consistency; @@ -221,7 +223,9 @@ impl ScyllaConfigsHisto { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - let res = self.make_histo(node_config).await?; + let res = self + .make_histo(&node_config.node_config.cluster.backend, node_config) + .await?; let body = Body::from(serde_json::to_vec(&res)?); Ok(response(StatusCode::OK).body(body)?) } else { @@ -232,7 +236,7 @@ impl ScyllaConfigsHisto { } } - async fn make_histo(&self, node_config: &NodeConfigCached) -> Result { + async fn make_histo(&self, backend: &str, node_config: &NodeConfigCached) -> Result { let scyco = node_config .node_config .cluster @@ -246,7 +250,6 @@ impl ScyllaConfigsHisto { .build() .await .err_conv()?; - let backend = "scylla"; let res = scy .query( "select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering", @@ -336,7 +339,9 @@ impl ScyllaChannelsWithType { if accept == APP_JSON || accept == ACCEPT_ALL { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelsWithTypeQuery::from_url(&url)?; - let res = self.get_channels(&q, node_config).await?; + let res = self + .get_channels(&q, &node_config.node_config.cluster.backend, node_config) + .await?; let body = Body::from(serde_json::to_vec(&res)?); Ok(response(StatusCode::OK).body(body)?) } else { @@ -350,6 +355,7 @@ impl ScyllaChannelsWithType { async fn get_channels( &self, q: &ChannelsWithTypeQuery, + backend: &str, node_config: &NodeConfigCached, ) -> Result { let scyco = node_config @@ -365,7 +371,6 @@ impl ScyllaChannelsWithType { .build() .await .err_conv()?; - let backend = "scylla"; let res = scy .query( "select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering", @@ -1160,7 +1165,7 @@ impl GenerateScyllaTestData { let dbconf = &node_config.node_config.cluster.database; let _pg_client = create_connection(dbconf).await?; let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap(); - let scy = create_scylla_connection(scyconf).await?; + let scy = scyllaconn::create_scy_session(scyconf).await?; let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. // Only later delete also from the `ts_msp` table. diff --git a/httpret/src/events.rs b/httpret/src/events.rs index fe2d22d..7b8688b 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,22 +1,14 @@ use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; -use crate::{response, response_err, BodyStream, ReqCtx, ToPublicResponse}; -use futures_util::{stream, Stream, StreamExt, TryStreamExt}; +use crate::{response, response_err, BodyStream, ToPublicResponse}; +use futures_util::{stream, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; -use items_2::channelevents::ChannelEvents; -use items_2::merger_cev::ChannelEventsMerger; -use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2}; use netpod::log::*; -use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; -use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; +use netpod::query::PlainEventsQuery; +use netpod::FromUrl; +use netpod::NodeConfigCached; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; -use scyllaconn::create_scy_session; -use scyllaconn::errconv::ErrConv; -use scyllaconn::events::{channel_state_events, make_scylla_stream}; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, Instant}; use url::Url; pub struct EventsHandler {} @@ -116,306 +108,3 @@ async fn plain_events_json( let ret = response(StatusCode::OK).body(Body::from(buf))?; Ok(ret) } - -pub struct EventsHandlerScylla {} - -impl EventsHandlerScylla { - pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/events" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle( - &self, - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); - } - match self.fetch(req, ctx, node_config).await { - Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), - } - } - - async fn fetch( - &self, - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - info!("EventsHandlerScylla req: {:?}", req); - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - Ok(self.gather(req, ctx, node_config).await?) - } else { - let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; - Ok(ret) - } - } - - async fn gather( - &self, - req: Request, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - let self_name = std::any::type_name::(); - let (head, _body) = req.into_parts(); - warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning"); - let s1 = format!("dummy:{}", head.uri); - let url = Url::parse(&s1)?; - let evq = PlainEventsQuery::from_url(&url)?; - let deadline = Instant::now() + evq.timeout(); - let pgclient = { - // TODO use common connection/pool: - info!("--------------- open postgres connection"); - let pgconf = &node_config.node_config.cluster.database; - let u = { - let d = &pgconf; - format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) - }; - let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; - tokio::spawn(pgconn); - let pgclient = Arc::new(pgclient); - pgclient - }; - let scyco = if let Some(scyco) = &node_config.node_config.cluster.scylla { - scyco - } else { - return Err(Error::with_public_msg(format!("no scylla configured"))); - }; - let scy = create_scy_session(scyco).await?; - let do_one_before_range = evq.agg_kind().need_expand(); - let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?; - let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); - let empty_stream = - futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item)))); - let stream2 = make_scylla_stream( - &evq, - do_one_before_range, - series, - scalar_type.clone(), - shape.clone(), - scy, - false, - ) - .await?; - let mut stream = empty_stream.chain(stream2); - let mut coll = None; - let mut fut = None; - loop { - // Alternative way, test what works better: - if fut.is_none() { - fut = Some(stream.next()); - } - let item = match tokio::time::timeout_at(deadline.into(), fut.as_mut().unwrap()).await { - Ok(Some(item)) => { - fut.take(); - item - } - Ok(None) => { - fut.take(); - break; - } - Err(_) => { - warn!("{self_name} timeout"); - fut.take(); - break; - } - }; - match item { - Ok(k) => match k { - ChannelEvents::Events(mut item) => { - if coll.is_none() { - coll = Some(items_0::collect_s::Collectable::new_collector(item.as_ref())); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(item.as_collectable_mut()); - } - ChannelEvents::Status(..) => {} - }, - Err(e) => { - return Err(e.into()); - } - } - } - match coll { - Some(mut coll) => { - let res = coll.result()?; - let res = res.to_json_result()?; - let res = res.to_json_bytes()?; - let ret = response(StatusCode::OK).body(Body::from(res))?; - Ok(ret) - } - None => { - error!("should never happen with changed logic, remove case"); - err::todo(); - let item = empty_events_dyn(&scalar_type, &shape, &AggKind::TimeWeightedScalar); - let res = item.to_box_to_json_result(); - let res = res.to_json_result()?; - let res = res.to_json_bytes()?; - let ret = response(StatusCode::OK).body(Body::from(res))?; - Ok(ret) - } - } - } -} - -pub struct BinnedHandlerScylla {} - -impl BinnedHandlerScylla { - pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/binned" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle( - &self, - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); - } - match self.fetch(req, ctx, node_config).await { - Ok(ret) => Ok(ret), - Err(e) => { - eprintln!("error: {e}"); - Ok(e.to_public_response()) - } - } - } - - async fn fetch( - &self, - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - info!("BinnedHandlerScylla req: {:?}", req); - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - Ok(self.gather(req, ctx, node_config).await?) - } else { - let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; - Ok(ret) - } - } - - async fn gather( - &self, - req: Request, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { - let (head, _body) = req.into_parts(); - warn!("TODO BinnedQuery needs to take AggKind to do x-binngin"); - let s1 = format!("dummy:{}", head.uri); - let url = Url::parse(&s1)?; - let evq = BinnedQuery::from_url(&url)?; - let do_one_before_range = evq.agg_kind().need_expand(); - let pgclient = { - // TODO use common connection/pool: - info!("--------------- open postgres connection"); - let pgconf = &node_config.node_config.cluster.database; - let u = { - let d = &pgconf; - format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) - }; - let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; - tokio::spawn(pgconn); - let pgclient = Arc::new(pgclient); - pgclient - }; - if let Some(scyco) = &node_config.node_config.cluster.scylla { - let scy = create_scy_session(scyco).await?; - let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?; - let range = covering.full_range(); - let mut query2 = PlainEventsQuery::new( - evq.channel().clone(), - range.clone(), - evq.agg_kind().clone(), - Duration::from_millis(6000), - None, - false, - ); - query2.set_timeout(evq.timeout()); - let query2 = query2; - let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?; - let stream = make_scylla_stream( - &query2, - do_one_before_range, - series, - scalar_type.clone(), - shape.clone(), - scy.clone(), - false, - ) - .await?; - let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), range.clone()); - let state_stream = channel_state_events(&query3, scy.clone()) - .await? - .map(|x| { - //eprintln!("state_stream {x:?}"); - x - }) - .map_err(|e| items_2::Error::from(format!("{e}"))); - // TODO let the stream itself use the items_2 error, do not convert here. - let data_stream = stream - .map(|x| { - //eprintln!("data_stream {x:?}"); - x - }) - .map_err(|e| items_2::Error::from(format!("{e}"))); - error!("TODO BinnedHandlerScylla::gather"); - err::todo(); - type Items = Pin> + Send>>; - let _data_stream = Box::pin(data_stream) as Items; - let _sate_stream = Box::pin(state_stream) as Items; - let merged_stream = ChannelEventsMerger::new(err::todoval()); - let _ = merged_stream; - //let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); - let merged_stream = Box::pin(merged_stream) as Pin + Send>>; - let binned_collected = binned_collected( - scalar_type.clone(), - shape.clone(), - evq.agg_kind().clone(), - covering.edges(), - evq.timeout(), - merged_stream, - ) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; - let res = binned_collected.to_json_result()?; - let res = res.to_json_bytes()?; - let ret = response(StatusCode::OK).body(Body::from(res))?; - { - let _ret = response(StatusCode::OK).body(BodyStream::wrapped( - futures_util::stream::iter([Ok(Vec::new())]), - format!("gather"), - ))?; - } - Ok(ret) - } else { - return Err(Error::with_public_msg(format!("no scylla configured"))); - } - } -} diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index efc40f3..c030f04 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -3,9 +3,9 @@ use crate::response; use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; -use hyper_tls::HttpsConnector; use netpod::log::*; -use netpod::{Node, NodeConfigCached, APP_JSON}; +use netpod::APP_JSON; +use netpod::{Node, NodeConfigCached}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -175,6 +175,7 @@ pub async fn gather_get_json_generic( tags: Vec, nt: NT, ft: FT, + // TODO use deadline instead timeout: Duration, ) -> Result, Error> where @@ -198,7 +199,6 @@ where .zip(tags.into_iter()) .map(move |((url, body), tag)| { let url_str = url.as_str(); - let is_tls = if url_str.starts_with("https://") { true } else { false }; let req = if body.is_some() { Request::builder().method(Method::POST).uri(url_str) } else { @@ -210,7 +210,6 @@ where } else { req }; - //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); let body = match body { None => Body::empty(), Some(body) => body, @@ -222,15 +221,8 @@ where Err(Error::with_msg("timeout")) } res = { - if is_tls { - let https = HttpsConnector::new(); - let client = Client::builder().build::<_, hyper::Body>(https); - client.request(req?).fuse() - } - else { - let client = Client::new(); - client.request(req?).fuse() - } + let client = Client::new(); + client.request(req?).fuse() } => { let ret = nt(tag, res?).await?; Ok(ret) diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 1629340..be79ac6 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -312,13 +312,9 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = events::EventsHandlerScylla::handler(&req) { - h.handle(req, ctx, &node_config).await - } else if let Some(h) = events::BinnedHandlerScylla::handler(&req) { - h.handle(req, ctx, &node_config).await } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { h.handle(req, ctx, &node_config).await - } else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) { + } else if let Some(h) = channel_status::ChannelStatusEvents::handler(&req) { h.handle(req, ctx, &node_config).await } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { h.handle(req, &node_config).await @@ -497,7 +493,7 @@ async fn node_status( let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { Some(k) => { - let mut st = vec![]; + let mut st = Vec::new(); for p in &k.data_base_paths { let _m = match tokio::fs::metadata(p).await { Ok(m) => m, @@ -718,7 +714,7 @@ impl StatusBoardEntry { ts_updated: SystemTime::now(), is_error: false, is_ok: false, - errors: vec![], + errors: Vec::new(), } } } @@ -808,7 +804,7 @@ impl StatusBoard { match self.entries.get(status_id) { Some(e) => { if e.is_ok { - let js = StatJs { errors: vec![] }; + let js = StatJs { errors: Vec::new() }; return serde_json::to_string(&js).unwrap(); } else if e.is_error { let errors = e.errors.iter().map(|e| (&e.0).into()).collect(); @@ -816,7 +812,7 @@ impl StatusBoard { return serde_json::to_string(&js).unwrap(); } else { warn!("requestStatus for unfinished {status_id}"); - let js = StatJs { errors: vec![] }; + let js = StatJs { errors: Vec::new() }; return serde_json::to_string(&js).unwrap(); } } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 4fb8d9d..c2ff873 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -9,7 +9,6 @@ use futures_util::{pin_mut, Stream}; use http::{Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; -use hyper_tls::HttpsConnector; use itertools::Itertools; use netpod::log::*; use netpod::query::{BinnedQuery, PlainEventsQuery}; @@ -458,14 +457,8 @@ pub async fn proxy_api1_map_pulse( let sh = &g.url; let url = format!("{}/api/1/map/pulse/{}", sh, pulseid); let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?; - let res = if sh.starts_with("https") { - let https = HttpsConnector::new(); - let c = hyper::Client::builder().build(https); - c.request(req).await? - } else { - let c = hyper::Client::new(); - c.request(req).await? - }; + let c = hyper::Client::new(); + let res = c.request(req).await?; let ret = response(StatusCode::OK).body(res.into_body())?; Ok(ret) } diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 166ea35..74a264c 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -557,7 +557,7 @@ impl MapPulseScyllaHandler { } } - pub async fn handle(&self, req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } @@ -565,13 +565,12 @@ impl MapPulseScyllaHandler { let url = url::Url::parse(&urls)?; let query = MapPulseQuery::from_url(&url)?; let pulse = query.pulse; - let scy = scylla::SessionBuilder::new() - .known_node("sf-daqbuf-34:19042") - .default_consistency(scylla::batch::Consistency::One) - .use_keyspace("ks1", false) - .build() - .await - .err_conv()?; + let scyconf = if let Some(x) = node_config.node_config.cluster.scylla.as_ref() { + x + } else { + return Err(Error::with_public_msg_no_trace("no scylla configured")); + }; + let scy = scyllaconn::create_scy_session(&scyconf).await?; let pulse_a = (pulse >> 14) as i64; let pulse_b = (pulse & 0x3fff) as i32; let res = scy @@ -582,8 +581,9 @@ impl MapPulseScyllaHandler { .await .err_conv()?; let rows = res.rows().err_conv()?; - let mut tss = vec![]; - let mut channels = vec![]; + let ch = "pulsemaptable"; + let mut tss = Vec::new(); + let mut channels = Vec::new(); use scylla::frame::response::result::CqlValue; let ts_a_def = CqlValue::BigInt(0); let ts_b_def = CqlValue::Int(0); @@ -591,7 +591,7 @@ impl MapPulseScyllaHandler { let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64; let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64; tss.push(ts_a * netpod::timeunits::SEC + ts_b); - channels.push("scylla".into()); + channels.push(ch.into()); } let ret = LocalMap { pulse, tss, channels }; Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 647b68b..7682e19 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -22,7 +22,7 @@ byteorder = "1.4.3" futures-util = "0.3.14" tracing = "0.1.25" hex = "0.4.3" -scylla = "0.5" +scylla = "0.6.1" tokio-postgres = "0.7.7" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/scyllaconn/Cargo.toml b/scyllaconn/Cargo.toml index 1b3cdcc..7842738 100644 --- a/scyllaconn/Cargo.toml +++ b/scyllaconn/Cargo.toml @@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.2" erased-serde = "0.3" -tokio = { version = "1.20.1", default-features = false, features = ["time", "sync"] } +tokio = { version = "1.23.0", default-features = false, features = ["time", "sync"] } byteorder = "1.4.3" bytes = "1.2.1" num-traits = "0.2.15" @@ -20,7 +20,7 @@ chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" async-channel = "1.7.1" -scylla = "0.5" +scylla = "0.6.1" tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 6230931..fe4b280 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -582,7 +582,7 @@ pub async fn make_scylla_stream( pub async fn channel_state_events( evq: &ChannelStateEventsQuery, scy: Arc, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { let (tx, rx) = async_channel::bounded(8); let evq = evq.clone(); let fut = async move { @@ -618,9 +618,7 @@ pub async fn channel_state_events( } }; let ev = ConnStatusEvent { ts, status }; - tx.send(Ok(ChannelEvents::Status(ev))) - .await - .map_err(|e| format!("{e}"))?; + tx.send(Ok(ev)).await.map_err(|e| format!("{e}"))?; } } ts_msp += div;