From 4a3f8986fe305fbdd355047c486b904c0fb7668d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Sep 2022 18:53:00 +0200 Subject: [PATCH] Refactor (WIP) the event fetch pipeline --- Cargo.toml | 2 +- daqbufp2/src/test/binnedjson.rs | 3 +- daqbufp2/src/test/events.rs | 2 +- dbconn/Cargo.toml | 16 +- disk/src/channelexec.rs | 3 +- disk/src/disk.rs | 1 - disk/src/events.rs | 188 --------- httpret/Cargo.toml | 4 +- httpret/src/channelconfig.rs | 3 +- httpret/src/events.rs | 112 +++++- httpret/src/evinfo.rs | 12 +- httpret/src/httpret.rs | 2 + httpret/src/proxy.rs | 3 +- items/Cargo.toml | 10 +- items/src/binnernew.rs | 11 - items/src/items.rs | 1 - items_2/Cargo.toml | 22 ++ items_2/src/binsdim0.rs | 538 ++++++++++++++++++++++++++ items_2/src/eventsdim0.rs | 660 ++++++++++++++++++++++++++++++++ items_2/src/items_2.rs | 456 ++++++++++++++++++++++ items_2/src/streams.rs | 77 ++++ netpod/src/query.rs | 176 +++++++++ nodenet/Cargo.toml | 4 +- scyllaconn/Cargo.toml | 27 ++ scyllaconn/src/bincache.rs | 468 ++++++++++++++++++++++ scyllaconn/src/errconv.rs | 51 +++ scyllaconn/src/events.rs | 606 +++++++++++++++++++++++++++++ scyllaconn/src/scyllaconn.rs | 20 + 28 files changed, 3239 insertions(+), 239 deletions(-) delete mode 100644 disk/src/events.rs delete mode 100644 items/src/binnernew.rs create mode 100644 items_2/Cargo.toml create mode 100644 items_2/src/binsdim0.rs create mode 100644 items_2/src/eventsdim0.rs create mode 100644 items_2/src/items_2.rs create mode 100644 items_2/src/streams.rs create mode 100644 scyllaconn/Cargo.toml create mode 100644 scyllaconn/src/bincache.rs create mode 100644 scyllaconn/src/errconv.rs create mode 100644 scyllaconn/src/events.rs create mode 100644 scyllaconn/src/scyllaconn.rs diff --git a/Cargo.toml b/Cargo.toml index 8259feb..0fb171f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer", "h5out", "items", "items_proc", "nodenet", "httpclient", "fsio", "dq"] +members = ["daqbuffer", "httpret", "h5out", "items", "items_2", "items_proc", "nodenet", "httpclient", "fsio", "dq"] [profile.release] opt-level = 1 diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index cd714dc..5bd5ecc 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -3,12 +3,11 @@ mod channelarchiver; use crate::err::ErrConv; use crate::nodes::{require_sls_test_host_running, require_test_hosts_running}; use chrono::{DateTime, Utc}; -use disk::events::PlainEventsQuery; use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::{BinnedQuery, CacheUsage}; +use netpod::query::{BinnedQuery, CacheUsage, PlainEventsQuery}; use netpod::{f64_close, AppendToUrl}; use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON}; use serde::{Deserialize, Serialize}; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 1dd3884..a528f5b 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -1,7 +1,6 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::events::PlainEventsQuery; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -12,6 +11,7 @@ use items::numops::NumOps; use items::scalarevents::ScalarEvents; use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; use netpod::log::*; +use netpod::query::PlainEventsQuery; use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 5606f12..bea4d94 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -10,19 +10,19 @@ path = "src/dbconn.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } tracing = "0.1.25" crc32fast = "1.2.1" arrayref = "0.3.6" -byteorder = "1.4.3" -futures-core = "0.3.14" -futures-util = "0.3.14" -bytes = "1.0.1" -pin-project = "1.0.7" +byteorder = "1.4" +futures-core = "0.3.24" +futures-util = "0.3.24" +bytes = "1.2" +pin-project = "1" #async-channel = "1" #dashmap = "3" -tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } -scylla = "0.4.4" +scylla = "0.5" async-channel = "1.6" chrono = "0.4" regex = "1.5.4" diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 2cfd3f9..d637d9d 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -3,7 +3,6 @@ use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::events::PlainEventsQuery; use crate::merge::mergedfromremotes::MergedFromRemotes; use bytes::Bytes; use err::Error; @@ -18,7 +17,7 @@ use items::{ TimeBinnableType, }; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::{PlainEventsQuery, RawEventsQuery}; use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 5d94b6d..5c5fcbc 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -9,7 +9,6 @@ pub mod dataopen; pub mod decode; pub mod eventblobs; pub mod eventchunker; -pub mod events; pub mod frame; pub mod gen; pub mod index; diff --git a/disk/src/events.rs b/disk/src/events.rs deleted file mode 100644 index 3f07fb5..0000000 --- a/disk/src/events.rs +++ /dev/null @@ -1,188 +0,0 @@ -use chrono::{DateTime, TimeZone, Utc}; -use err::Error; -use netpod::get_url_query_pairs; -use netpod::{AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; -use std::time::Duration; -use url::Url; - -// TODO move this query type out of this `binned` mod -#[derive(Clone, Debug)] -pub struct PlainEventsQuery { - channel: Channel, - range: NanoRange, - disk_io_buffer_size: usize, - report_error: bool, - timeout: Duration, - events_max: Option, - do_log: bool, - do_test_main_error: bool, - do_test_stream_error: bool, -} - -impl PlainEventsQuery { - pub fn new( - channel: Channel, - range: NanoRange, - disk_io_buffer_size: usize, - events_max: Option, - do_log: bool, - ) -> Self { - Self { - channel, - range, - disk_io_buffer_size, - report_error: false, - timeout: Duration::from_millis(10000), - events_max, - do_log, - do_test_main_error: false, - do_test_stream_error: false, - } - } - - pub fn from_request_head(head: &http::request::Parts) -> Result { - let s1 = format!("dummy:{}", head.uri); - let url = Url::parse(&s1)?; - Self::from_url(&url) - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn range(&self) -> &NanoRange { - &self.range - } - - pub fn report_error(&self) -> bool { - self.report_error - } - - pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size - } - - pub fn timeout(&self) -> Duration { - self.timeout - } - - pub fn events_max(&self) -> Option { - self.events_max - } - - pub fn do_log(&self) -> bool { - self.do_log - } - - pub fn do_test_main_error(&self) -> bool { - self.do_test_main_error - } - - pub fn do_test_stream_error(&self) -> bool { - self.do_test_stream_error - } - - pub fn set_series_id(&mut self, series: u64) { - self.channel.series = Some(series); - } - - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; - } - - pub fn set_do_test_main_error(&mut self, k: bool) { - self.do_test_main_error = k; - } - - pub fn set_do_test_stream_error(&mut self, k: bool) { - self.do_test_stream_error = k; - } -} - -impl HasBackend for PlainEventsQuery { - fn backend(&self) -> &str { - &self.channel.backend - } -} - -impl HasTimeout for PlainEventsQuery { - fn timeout(&self) -> Duration { - self.timeout.clone() - } -} - -impl FromUrl for PlainEventsQuery { - fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - Self::from_pairs(&pairs) - } - - fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { - let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; - let ret = Self { - channel: Channel::from_pairs(&pairs)?, - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, - timeout: pairs - .get("timeout") - .map_or("10000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, - events_max: pairs - .get("eventsMax") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - do_log: pairs - .get("doLog") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, - do_test_main_error: pairs - .get("doTestMainError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, - do_test_stream_error: pairs - .get("doTestStreamError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, - }; - Ok(ret) - } -} - -impl AppendToUrl for PlainEventsQuery { - fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - self.channel.append_to_url(url); - let mut g = url.query_pairs_mut(); - g.append_pair( - "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), - ); - g.append_pair( - "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), - ); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - if let Some(x) = self.events_max.as_ref() { - g.append_pair("eventsMax", &format!("{}", x)); - } - g.append_pair("doLog", &format!("{}", self.do_log)); - } -} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 07d80fa..9d35da3 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -29,10 +29,12 @@ dbconn = { path = "../dbconn" } tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } disk = { path = "../disk" } items = { path = "../items" } +items_2 = { path = "../items_2" } parse = { path = "../parse" } nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } -scylla = "0.4" +scyllaconn = { path = "../scyllaconn" } +scylla = "0.5" md-5 = "0.9" regex = "1.6" diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index e4ad308..3dac6bf 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -2,12 +2,11 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; use dbconn::{create_connection, create_scylla_connection}; use disk::binned::query::PreBinnedQuery; -use disk::events::PlainEventsQuery; use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; -use netpod::query::BinnedQuery; +use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::timeunits::*; use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 94285c4..27715a7 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,13 +1,19 @@ +use std::sync::Arc; + use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; use crate::{response, response_err, BodyStream, ToPublicResponse}; -use disk::events::PlainEventsQuery; use futures_util::{StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; +use items_2::ChannelEvents; use netpod::log::*; +use netpod::query::PlainEventsQuery; use netpod::{AggKind, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use scyllaconn::create_scy_session; +use scyllaconn::errconv::ErrConv; +use scyllaconn::events::make_scylla_stream; use url::Url; pub struct EventsHandler {} @@ -83,7 +89,9 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); - let query = PlainEventsQuery::from_request_head(&head)?; + let s1 = format!("dummy:{}", head.uri); + let url = Url::parse(&s1)?; + let query = PlainEventsQuery::from_url(&url)?; let chconf = chconf_from_events_json(&query, node_config).await?; // Update the series id since we don't require some unique identifier yet. @@ -118,3 +126,103 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - ))?; Ok(ret) } + +pub struct EventsHandlerScylla {} + +impl EventsHandlerScylla { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/events" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + match self.fetch(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(e.to_public_response()), + } + } + + async fn fetch(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("EventsHandlerScylla req: {:?}", req); + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + Ok(self.gather(req, node_config).await?) + } else { + let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; + Ok(ret) + } + } + + async fn gather(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + warn!("TODO PlainEventsQuery needs to take AggKind"); + let s1 = format!("dummy:{}", head.uri); + let url = Url::parse(&s1)?; + let evq = PlainEventsQuery::from_url(&url)?; + let pgclient = { + // TODO use common connection/pool: + info!("--------------- open postgres connection"); + let pgconf = &node_config.node_config.cluster.database; + let u = { + let d = &pgconf; + format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) + }; + let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; + tokio::spawn(pgconn); + let pgclient = Arc::new(pgclient); + pgclient + }; + let mut stream = if let Some(scyco) = &node_config.node_config.cluster.scylla { + let scy = create_scy_session(scyco).await?; + let stream = make_scylla_stream(&evq, scy, &pgclient, false).await?; + stream + } else { + return Err(Error::with_public_msg(format!("no scylla configured"))); + }; + let mut coll = None; + while let Some(item) = stream.next().await { + match item { + Ok(k) => match k { + ChannelEvents::Events(mut item) => { + if coll.is_none() { + coll = Some(item.new_collector()); + } + let cl = coll.as_mut().unwrap(); + cl.ingest(item.as_collectable_mut()); + } + ChannelEvents::Status(..) => {} + ChannelEvents::RangeComplete => {} + }, + Err(e) => { + return Err(e.into()); + } + } + } + match coll { + Some(mut coll) => { + let res = coll.result()?; + let res = res.to_json_result()?; + let res = res.to_json_bytes()?; + let ret = response(StatusCode::OK).body(Body::from(res))?; + Ok(ret) + } + None => { + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + futures_util::stream::iter([Ok(Vec::new())]), + format!("EventsHandlerScylla::gather"), + ))?; + Ok(ret) + } + } + } +} diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index 5e2d142..f5afcc7 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -9,7 +9,6 @@ use disk::decode::Endianness; use disk::decode::EventValueFromBytes; use disk::decode::EventValueShape; use disk::decode::NumFromBytes; -use disk::events::PlainEventsQuery; use disk::merge::mergedfromremotes::MergedFromRemotes; use futures_util::FutureExt; use futures_util::Stream; @@ -26,15 +25,8 @@ use items::PushableIndex; use items::Sitemty; use items::TimeBinnableType; use netpod::log::*; -use netpod::query::RawEventsQuery; -use netpod::AggKind; -use netpod::Channel; -use netpod::FromUrl; -use netpod::NanoRange; -use netpod::NodeConfigCached; -use netpod::PerfOpts; -use netpod::ScalarType; -use netpod::Shape; +use netpod::query::{PlainEventsQuery, RawEventsQuery}; +use netpod::{AggKind, Channel, FromUrl, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use serde::de::DeserializeOwned; use std::fmt::Debug; use std::pin::Pin; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index a2609aa..cd34f35 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -245,6 +245,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = events::EventsHandlerScylla::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 37bf848..eac3da3 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -5,7 +5,6 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; use crate::{api_1_docs, api_4_docs, response, response_err, Cont}; -use disk::events::PlainEventsQuery; use futures_core::Stream; use futures_util::pin_mut; use http::{Method, StatusCode}; @@ -14,7 +13,7 @@ use hyper::{Body, Request, Response, Server}; use hyper_tls::HttpsConnector; use itertools::Itertools; use netpod::log::*; -use netpod::query::BinnedQuery; +use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::{ AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, HasBackend, HasTimeout, ProxyConfig, ACCEPT_ALL, APP_JSON, diff --git a/items/Cargo.toml b/items/Cargo.toml index fda7403..b25bc42 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -13,11 +13,11 @@ serde_json = "1.0" serde_cbor = "0.11.1" erased-serde = "0.3" bincode = "1.3.3" -bytes = "1.0.1" -num-traits = "0.2.14" -tokio = { version = "1.7.1", features = ["fs"] } -chrono = { version = "0.4.19", features = ["serde"] } -crc32fast = "1.2.1" +bytes = "1.2.1" +num-traits = "0.2.15" +chrono = { version = "0.4.22", features = ["serde"] } +crc32fast = "1.3.2" +tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } err = { path = "../err" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items/src/binnernew.rs b/items/src/binnernew.rs deleted file mode 100644 index f5a1d99..0000000 --- a/items/src/binnernew.rs +++ /dev/null @@ -1,11 +0,0 @@ -pub enum ConnStatus {} - -pub struct ConnStatusEvent { - ts: u64, - status: ConnStatus, -} - -pub enum ChannelEvents { - Status(ConnStatus), - Data(), -} diff --git a/items/src/items.rs b/items/src/items.rs index 3bca178..80a3085 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -1,5 +1,4 @@ pub mod binnedevents; -pub mod binnernew; pub mod binsdim0; pub mod binsdim1; pub mod eventsitem; diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml new file mode 100644 index 0000000..eb8c34a --- /dev/null +++ b/items_2/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "items_2" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/items_2.rs" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.2" +erased-serde = "0.3" +bytes = "1.2.1" +num-traits = "0.2.15" +chrono = { version = "0.4.19", features = ["serde"] } +crc32fast = "1.3.2" +futures-util = "0.3.24" +err = { path = "../err" } +items_proc = { path = "../items_proc" } +netpod = { path = "../netpod" } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs new file mode 100644 index 0000000..888f1fd --- /dev/null +++ b/items_2/src/binsdim0.rs @@ -0,0 +1,538 @@ +use crate::streams::{CollectableType, CollectorType, ToJsonResult}; +use crate::{ + ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, ScalarOps, TimeBinnable, TimeBinnableType, + TimeBinnableTypeAggregator, TimeBinned, TimeBinner, TimeSeries, WithLen, +}; +use chrono::{TimeZone, Utc}; +use err::Error; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::NanoRange; +use num_traits::Zero; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::{fmt, mem}; + +#[derive(Clone, Serialize, Deserialize)] +pub struct MinMaxAvgDim0Bins { + pub ts1s: Vec, + pub ts2s: Vec, + pub counts: Vec, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, +} + +impl fmt::Debug for MinMaxAvgDim0Bins +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let self_name = std::any::type_name::(); + write!( + fmt, + "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } +} + +impl MinMaxAvgDim0Bins { + pub fn empty() -> Self { + Self { + ts1s: vec![], + ts2s: vec![], + counts: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } +} + +impl WithLen for MinMaxAvgDim0Bins { + fn len(&self) -> usize { + self.ts1s.len() + } +} + +impl Empty for MinMaxAvgDim0Bins { + fn empty() -> Self { + Self { + ts1s: Vec::new(), + ts2s: Vec::new(), + counts: Vec::new(), + mins: Vec::new(), + maxs: Vec::new(), + avgs: Vec::new(), + } + } +} + +impl AppendEmptyBin for MinMaxAvgDim0Bins { + fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { + self.ts1s.push(ts1); + self.ts2s.push(ts2); + self.counts.push(0); + self.mins.push(NTY::zero()); + self.maxs.push(NTY::zero()); + self.avgs.push(0.); + } +} + +impl TimeSeries for MinMaxAvgDim0Bins { + fn ts_min(&self) -> Option { + todo!("collection of bins can not be TimeSeries") + } + + fn ts_max(&self) -> Option { + todo!("collection of bins can not be TimeSeries") + } + + fn ts_min_max(&self) -> Option<(u64, u64)> { + todo!("collection of bins can not be TimeSeries") + } +} + +impl TimeBinnableType for MinMaxAvgDim0Bins { + type Output = MinMaxAvgDim0Bins; + type Aggregator = MinMaxAvgDim0BinsAggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let self_name = std::any::type_name::(); + debug!( + "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, do_time_weight) + } +} + +pub struct MinMaxAvgBinsCollected { + _m1: PhantomData, +} + +impl MinMaxAvgBinsCollected { + pub fn new() -> Self { + Self { _m1: PhantomData } + } +} + +#[derive(Serialize)] +pub struct MinMaxAvgBinsCollectedResult { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, + counts: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] + missing_bins: u32, + #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] + continue_at: Option, +} + +impl ToJsonResult for MinMaxAvgBinsCollectedResult { + fn to_json_result(&self) -> Result, Error> { + let k = serde_json::to_value(self)?; + Ok(Box::new(k)) + } +} + +pub struct MinMaxAvgBinsCollector { + timed_out: bool, + range_complete: bool, + vals: MinMaxAvgDim0Bins, +} + +impl MinMaxAvgBinsCollector { + pub fn new() -> Self { + Self { + timed_out: false, + range_complete: false, + vals: MinMaxAvgDim0Bins::::empty(), + } + } +} + +impl WithLen for MinMaxAvgBinsCollector { + fn len(&self) -> usize { + self.vals.ts1s.len() + } +} + +impl CollectorType for MinMaxAvgBinsCollector { + type Input = MinMaxAvgDim0Bins; + type Output = MinMaxAvgBinsCollectedResult; + + fn ingest(&mut self, _src: &mut Self::Input) { + err::todo(); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self) -> Result { + let bin_count = self.vals.ts1s.len() as u32; + // TODO could save the copy: + let mut ts_all = self.vals.ts1s.clone(); + if self.vals.ts2s.len() > 0 { + ts_all.push(*self.vals.ts2s.last().unwrap()); + } + info!("TODO return proper continueAt"); + let bin_count_exp = 100 as u32; + let continue_at = if self.vals.ts1s.len() < bin_count_exp as usize { + match ts_all.last() { + Some(&k) => { + let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); + Some(iso) + } + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let tst = ts_offs_from_abs(&ts_all); + let counts = mem::replace(&mut self.vals.counts, Vec::new()); + let mins = mem::replace(&mut self.vals.mins, Vec::new()); + let maxs = mem::replace(&mut self.vals.maxs, Vec::new()); + let avgs = mem::replace(&mut self.vals.avgs, Vec::new()); + let ret = MinMaxAvgBinsCollectedResult:: { + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, + counts, + mins, + maxs, + avgs, + finalised_range: self.range_complete, + missing_bins: bin_count_exp - bin_count, + continue_at, + }; + Ok(ret) + } +} + +impl CollectableType for MinMaxAvgDim0Bins { + type Collector = MinMaxAvgBinsCollector; + + fn new_collector() -> Self::Collector { + Self::Collector::new() + } +} + +pub struct MinMaxAvgDim0BinsAggregator { + range: NanoRange, + count: u64, + min: NTY, + max: NTY, + // Carry over to next bin: + avg: f32, + sumc: u64, + sum: f32, +} + +impl MinMaxAvgDim0BinsAggregator { + pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { + Self { + range, + count: 0, + min: NTY::zero(), + max: NTY::zero(), + avg: 0., + sumc: 0, + sum: 0f32, + } + } +} + +impl TimeBinnableTypeAggregator for MinMaxAvgDim0BinsAggregator { + type Input = MinMaxAvgDim0Bins; + type Output = MinMaxAvgDim0Bins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.ts1s.len() { + if item.counts[i1] == 0 { + } else if item.ts2s[i1] <= self.range.beg { + } else if item.ts1s[i1] >= self.range.end { + } else { + if self.count == 0 { + self.min = item.mins[i1].clone(); + self.max = item.maxs[i1].clone(); + } else { + if self.min > item.mins[i1] { + self.min = item.mins[i1].clone(); + } + if self.max < item.maxs[i1] { + self.max = item.maxs[i1].clone(); + } + } + self.count += item.counts[i1]; + self.sum += item.avgs[i1]; + self.sumc += 1; + } + } + } + + fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { + if self.sumc > 0 { + self.avg = self.sum / self.sumc as f32; + } + let ret = Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min.clone()], + maxs: vec![self.max.clone()], + avgs: vec![self.avg], + }; + self.range = range; + self.count = 0; + self.sum = 0f32; + self.sumc = 0; + ret + } +} + +impl TimeBinnable for MinMaxAvgDim0Bins { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + let ret = MinMaxAvgDim0BinsTimeBinner::::new(edges.into(), do_time_weight); + Box::new(ret) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +pub struct MinMaxAvgDim0BinsTimeBinner { + edges: VecDeque, + do_time_weight: bool, + agg: Option>, + ready: Option< as TimeBinnableTypeAggregator>::Output>, +} + +impl MinMaxAvgDim0BinsTimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Self { + Self { + edges, + do_time_weight, + agg: None, + ready: None, + } + } + + fn next_bin_range(&mut self) -> Option { + if self.edges.len() >= 2 { + let ret = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + self.edges.pop_front(); + Some(ret) + } else { + None + } + } +} + +impl TimeBinner for MinMaxAvgDim0BinsTimeBinner { + fn ingest(&mut self, item: &dyn TimeBinnable) { + let self_name = std::any::type_name::(); + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(NanoRange { + beg: 0, + end: self.edges[1], + }) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges B"); + return; + } + } + if item.ends_before(NanoRange { + beg: self.edges[0], + end: u64::MAX, + }) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} edge list exhausted"); + return; + } else { + let agg = if let Some(agg) = self.agg.as_mut() { + agg + } else { + self.agg = Some(MinMaxAvgDim0BinsAggregator::new( + // We know here that we have enough edges for another bin. + // and `next_bin_range` will pop the first edge. + self.next_bin_range().unwrap(), + self.do_time_weight, + )); + self.agg.as_mut().unwrap() + }; + if let Some(item) = item + .as_any() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + agg.ingest(item); + } else { + let tyid_item = std::any::Any::type_id(item.as_any()); + error!("not correct item type {:?}", tyid_item); + }; + if item.ends_after(agg.range().clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {self_name} no more bin in edges C"); + return; + } + } else { + break; + } + } + } + } + } + + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } + + // TODO there is too much common code between implementors: + fn push_in_progress(&mut self, push_empty: bool) { + // TODO expand should be derived from AggKind. Is it still required after all? + let expand = true; + if let Some(agg) = self.agg.as_mut() { + let dummy_range = NanoRange { beg: 4, end: 5 }; + let bins = agg.result_reset(dummy_range, expand); + self.agg = None; + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(_ready) => { + err::todo(); + //ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + // TODO there is too much common code between implementors: + fn cycle(&mut self) { + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(_range) = self.next_bin_range() { + let bins = MinMaxAvgDim0Bins::::empty(); + err::todo(); + //bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(_ready) => { + err::todo(); + //ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } +} + +impl TimeBinned for MinMaxAvgDim0Bins { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable { + self as &dyn TimeBinnable + } + + fn edges_slice(&self) -> (&[u64], &[u64]) { + (&self.ts1s[..], &self.ts2s[..]) + } + + fn counts(&self) -> &[u64] { + &self.counts[..] + } + + fn mins(&self) -> Vec { + self.mins.iter().map(|x| x.clone().as_prim_f32()).collect() + } + + fn maxs(&self) -> Vec { + self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect() + } + + fn avgs(&self) -> Vec { + self.avgs.clone() + } + + fn validate(&self) -> Result<(), String> { + use std::fmt::Write; + let mut msg = String::new(); + if self.ts1s.len() != self.ts2s.len() { + write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); + } + for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { + if min.as_prim_f32() < 1. && *count != 0 { + write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); + } + } + if msg.is_empty() { + Ok(()) + } else { + Err(msg) + } + } +} diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs new file mode 100644 index 0000000..09ceb45 --- /dev/null +++ b/items_2/src/eventsdim0.rs @@ -0,0 +1,660 @@ +use crate::binsdim0::MinMaxAvgDim0Bins; +use crate::streams::{CollectableType, CollectorType, ToJsonResult}; +use crate::{pulse_offs_from_abs, ts_offs_from_abs}; +use crate::{ + Empty, Events, ScalarOps, TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner, TimeSeries, + WithLen, +}; +use err::Error; +use netpod::log::*; +use netpod::NanoRange; +use serde::{Deserialize, Serialize}; +use std::any::Any; +use std::collections::VecDeque; +use std::{fmt, mem}; + +// TODO in this module reduce clones. + +#[derive(Serialize, Deserialize)] +pub struct EventsDim0 { + pub tss: Vec, + pub pulses: Vec, + pub values: Vec, +} + +impl EventsDim0 { + #[inline(always)] + pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) { + self.tss.push(ts); + self.pulses.push(pulse); + self.values.push(value); + } +} + +impl Empty for EventsDim0 { + fn empty() -> Self { + Self { + tss: vec![], + pulses: vec![], + values: vec![], + } + } +} + +impl fmt::Debug for EventsDim0 +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "count {} ts {:?} .. {:?} vals {:?} .. {:?}", + self.tss.len(), + self.tss.first(), + self.tss.last(), + self.values.first(), + self.values.last(), + ) + } +} + +impl WithLen for EventsDim0 { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl TimeSeries for EventsDim0 { + fn ts_min(&self) -> Option { + self.tss.first().map(Clone::clone) + } + + fn ts_max(&self) -> Option { + self.tss.last().map(Clone::clone) + } + + fn ts_min_max(&self) -> Option<(u64, u64)> { + if self.tss.len() == 0 { + None + } else { + Some((self.tss.first().unwrap().clone(), self.tss.last().unwrap().clone())) + } + } +} + +impl TimeBinnableType for EventsDim0 { + type Output = MinMaxAvgDim0Bins; + type Aggregator = EventValuesAggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + let self_name = std::any::type_name::(); + debug!( + "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", + range, x_bin_count, do_time_weight + ); + Self::Aggregator::new(range, do_time_weight) + } +} + +pub struct EventValuesCollector { + vals: EventsDim0, + range_complete: bool, + timed_out: bool, +} + +impl EventValuesCollector { + pub fn new() -> Self { + Self { + vals: EventsDim0::empty(), + range_complete: false, + timed_out: false, + } + } +} + +impl WithLen for EventValuesCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +#[derive(Serialize)] +pub struct EventValuesCollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, + #[serde(rename = "pulseAnchor")] + pulse_anchor: u64, + #[serde(rename = "pulseOff")] + pulse_off: Vec, + values: Vec, + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + range_complete: bool, + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] + timed_out: bool, +} + +impl ToJsonResult for EventValuesCollectorOutput { + fn to_json_result(&self) -> Result, Error> { + let k = serde_json::to_value(self)?; + Ok(Box::new(k)) + } +} + +impl CollectorType for EventValuesCollector { + type Input = EventsDim0; + type Output = EventValuesCollectorOutput; + + fn ingest(&mut self, src: &mut Self::Input) { + // TODO could be optimized by non-contiguous container. + self.vals.tss.append(&mut src.tss); + self.vals.pulses.append(&mut src.pulses); + self.vals.values.append(&mut src.values); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(&mut self) -> Result { + let tst = ts_offs_from_abs(&self.vals.tss); + let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses); + let ret = Self::Output { + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, + pulse_anchor, + pulse_off, + values: mem::replace(&mut self.vals.values, Vec::new()), + range_complete: self.range_complete, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl CollectableType for EventsDim0 { + type Collector = EventValuesCollector; + + fn new_collector() -> Self::Collector { + Self::Collector::new() + } +} + +pub struct EventValuesAggregator { + range: NanoRange, + count: u64, + min: NTY, + max: NTY, + sumc: u64, + sum: f32, + int_ts: u64, + last_ts: u64, + last_val: Option, + do_time_weight: bool, + events_taken_count: u64, + events_ignored_count: u64, +} + +impl Drop for EventValuesAggregator { + fn drop(&mut self) { + // TODO collect as stats for the request context: + trace!( + "taken {} ignored {}", + self.events_taken_count, + self.events_ignored_count + ); + } +} + +impl EventValuesAggregator { + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + let int_ts = range.beg; + Self { + range, + count: 0, + min: NTY::zero(), + max: NTY::zero(), + sum: 0., + sumc: 0, + int_ts, + last_ts: 0, + last_val: None, + do_time_weight, + events_taken_count: 0, + events_ignored_count: 0, + } + } + + // TODO reduce clone.. optimize via more traits to factor the trade-offs? + fn apply_min_max(&mut self, val: NTY) { + if self.count == 0 { + self.min = val.clone(); + self.max = val.clone(); + } else { + if self.min > val { + self.min = val.clone(); + } + if self.max < val { + self.max = val.clone(); + } + } + } + + fn apply_event_unweight(&mut self, val: NTY) { + let vf = val.as_prim_f32(); + self.apply_min_max(val); + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, ts: u64) { + if let Some(v) = &self.last_val { + let vf = v.as_prim_f32(); + let v2 = v.clone(); + self.apply_min_max(v2); + let w = if self.do_time_weight { + (ts - self.int_ts) as f32 * 1e-9 + } else { + 1. + }; + if vf.is_nan() { + } else { + self.sum += vf * w; + self.sumc += 1; + } + self.int_ts = ts; + } else { + debug!( + "apply_event_time_weight NO VALUE {}", + ts as i64 - self.range.beg as i64 + ); + } + } + + fn ingest_unweight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + if ts < self.range.beg { + self.events_ignored_count += 1; + } else if ts >= self.range.end { + self.events_ignored_count += 1; + return; + } else { + self.apply_event_unweight(val); + self.count += 1; + self.events_taken_count += 1; + } + } + } + + fn ingest_time_weight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = item.values[i1].clone(); + if ts < self.int_ts { + if self.last_val.is_none() { + info!( + "ingest_time_weight event before range, only set last ts {} val {:?}", + ts, val + ); + } + self.events_ignored_count += 1; + self.last_ts = ts; + self.last_val = Some(val); + } else if ts >= self.range.end { + self.events_ignored_count += 1; + return; + } else { + self.apply_event_time_weight(ts); + if self.last_val.is_none() { + info!( + "call apply_min_max without last val, use current instead {} {:?}", + ts, val + ); + self.apply_min_max(val.clone()); + } + self.count += 1; + self.last_ts = ts; + self.last_val = Some(val); + self.events_taken_count += 1; + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim0Bins { + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / self.sumc as f32; + (self.min.clone(), self.max.clone(), avg) + } else { + let g = match &self.last_val { + Some(x) => x.clone(), + None => NTY::zero(), + }; + (g.clone(), g.clone(), g.as_prim_f32()) + }; + let ret = MinMaxAvgDim0Bins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.sum = 0f32; + self.sumc = 0; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgDim0Bins { + // TODO check callsite for correct expand status. + if expand { + debug!("result_reset_time_weight calls apply_event_time_weight"); + self.apply_event_time_weight(self.range.end); + } else { + debug!("result_reset_time_weight NO EXPAND"); + } + let (min, max, avg) = if self.sumc > 0 { + let avg = self.sum / (self.range.delta() as f32 * 1e-9); + (self.min.clone(), self.max.clone(), avg) + } else { + let g = match &self.last_val { + Some(x) => x.clone(), + None => NTY::zero(), + }; + (g.clone(), g.clone(), g.as_prim_f32()) + }; + let ret = MinMaxAvgDim0Bins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.sum = 0f32; + self.sumc = 0; + ret + } +} + +impl TimeBinnableTypeAggregator for EventValuesAggregator { + type Input = EventsDim0; + type Output = MinMaxAvgDim0Bins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + debug!("ingest len {}", item.len()); + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) + } + } + + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + debug!("Produce for {:?} next {:?}", self.range, range); + if self.do_time_weight { + self.result_reset_time_weight(range, expand) + } else { + self.result_reset_unweight(range, expand) + } + } +} + +impl TimeBinnable for EventsDim0 { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { + let ret = ScalarEventsTimeBinner::::new(edges.into(), do_time_weight); + Box::new(ret) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +impl Events for EventsDim0 { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable { + self as &dyn TimeBinnable + } + + fn verify(&self) { + let mut ts_max = 0; + for ts in &self.tss { + let ts = *ts; + if ts < ts_max { + error!("unordered event data ts {} ts_max {}", ts, ts_max); + } + ts_max = ts_max.max(ts); + } + } + + fn output_info(&self) { + if false { + info!("output_info len {}", self.tss.len()); + if self.tss.len() == 1 { + info!( + " only: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + } else if self.tss.len() > 1 { + info!( + " first: ts {} pulse {} value {:?}", + self.tss[0], self.pulses[0], self.values[0] + ); + let n = self.tss.len() - 1; + info!( + " last: ts {} pulse {} value {:?}", + self.tss[n], self.pulses[n], self.values[n] + ); + } + } + } + + fn as_collectable_mut(&mut self) -> &mut dyn crate::streams::Collectable { + self + } +} + +pub struct ScalarEventsTimeBinner { + // The first two edges are used the next time that we create an aggregator, or push a zero bin. + edges: VecDeque, + do_time_weight: bool, + agg: Option>, + ready: Option< as TimeBinnableTypeAggregator>::Output>, +} + +impl ScalarEventsTimeBinner { + fn new(edges: VecDeque, do_time_weight: bool) -> Self { + Self { + edges, + do_time_weight, + agg: None, + ready: None, + } + } + + fn next_bin_range(&mut self) -> Option { + if self.edges.len() >= 2 { + let ret = NanoRange { + beg: self.edges[0], + end: self.edges[1], + }; + self.edges.pop_front(); + Some(ret) + } else { + None + } + } +} + +impl TimeBinner for ScalarEventsTimeBinner { + fn bins_ready_count(&self) -> usize { + match &self.ready { + Some(k) => k.len(), + None => 0, + } + } + + fn bins_ready(&mut self) -> Option> { + match self.ready.take() { + Some(k) => Some(Box::new(k)), + None => None, + } + } + + fn ingest(&mut self, item: &dyn TimeBinnable) { + const SELF: &str = "ScalarEventsTimeBinner"; + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges A"); + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + loop { + while item.starts_after(NanoRange { + beg: 0, + end: self.edges[1], + }) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges B"); + return; + } + } + if item.ends_before(NanoRange { + beg: self.edges[0], + end: u64::MAX, + }) { + return; + } else { + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} edge list exhausted"); + return; + } else { + let agg = if let Some(agg) = self.agg.as_mut() { + agg + } else { + self.agg = Some(EventValuesAggregator::new( + // We know here that we have enough edges for another bin. + // and `next_bin_range` will pop the first edge. + self.next_bin_range().unwrap(), + self.do_time_weight, + )); + self.agg.as_mut().unwrap() + }; + if let Some(item) = item + .as_any() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + { + // TODO collect statistics associated with this request: + agg.ingest(item); + } else { + error!("not correct item type"); + }; + if item.ends_after(agg.range().clone()) { + self.cycle(); + if self.edges.len() < 2 { + warn!("TimeBinnerDyn for {SELF} no more bin in edges C"); + return; + } + } else { + break; + } + } + } + } + } + + fn push_in_progress(&mut self, push_empty: bool) { + // TODO expand should be derived from AggKind. Is it still required after all? + // TODO here, the expand means that agg will assume that the current value is kept constant during + // the rest of the time range. + let expand = true; + let range_next = if self.agg.is_some() { + if let Some(x) = self.next_bin_range() { + Some(x) + } else { + None + } + } else { + None + }; + if let Some(agg) = self.agg.as_mut() { + let bins; + if let Some(range_next) = range_next { + bins = agg.result_reset(range_next, expand); + } else { + let range_next = NanoRange { beg: 4, end: 5 }; + bins = agg.result_reset(range_next, expand); + self.agg = None; + } + assert_eq!(bins.len(), 1); + if push_empty || bins.counts[0] != 0 { + match self.ready.as_mut() { + Some(_ready) => { + error!("TODO eventsdim0 time binner append"); + err::todo(); + //ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + } + } + } + + fn cycle(&mut self) { + let n = self.bins_ready_count(); + self.push_in_progress(true); + if self.bins_ready_count() == n { + if let Some(_range) = self.next_bin_range() { + let bins = MinMaxAvgDim0Bins::::empty(); + error!("TODO eventsdim0 time binner append"); + err::todo(); + //bins.append_zero(range.beg, range.end); + match self.ready.as_mut() { + Some(_ready) => { + error!("TODO eventsdim0 time binner append"); + err::todo(); + //ready.append(&mut bins); + } + None => { + self.ready = Some(bins); + } + } + if self.bins_ready_count() <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } +} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs new file mode 100644 index 0000000..5185cd5 --- /dev/null +++ b/items_2/src/items_2.rs @@ -0,0 +1,456 @@ +pub mod binsdim0; +pub mod eventsdim0; +pub mod streams; + +use chrono::{DateTime, TimeZone, Utc}; +use futures_util::Stream; +use netpod::log::error; +use netpod::timeunits::*; +use netpod::{AggKind, NanoRange, ScalarType, Shape}; +use serde::{Deserialize, Serialize, Serializer}; +use std::any::Any; +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; +use streams::Collectable; + +pub fn bool_is_false(x: &bool) -> bool { + *x == false +} + +pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { + let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; + let ts_anchor_ns = ts_anchor_sec * SEC; + let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ns = tss + .iter() + .zip(ts_off_ms.iter().map(|&k| k * MS)) + .map(|(&j, k)| (j - ts_anchor_ns - k)) + .collect(); + (ts_anchor_sec, ts_off_ms, ts_off_ns) +} + +pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec) { + let pulse_anchor = pulse.first().map_or(0, |k| *k); + let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect(); + (pulse_anchor, pulse_off) +} + +#[allow(unused)] +const fn is_nan_int(_x: &T) -> bool { + false +} + +#[allow(unused)] +fn is_nan_f32(x: f32) -> bool { + x.is_nan() +} + +#[allow(unused)] +fn is_nan_f64(x: f64) -> bool { + x.is_nan() +} + +pub trait AsPrimF32 { + fn as_prim_f32(&self) -> f32; +} + +macro_rules! impl_as_prim_f32 { + ($ty:ident) => { + impl AsPrimF32 for $ty { + fn as_prim_f32(&self) -> f32 { + *self as f32 + } + } + }; +} + +impl_as_prim_f32!(u8); +impl_as_prim_f32!(u16); +impl_as_prim_f32!(u32); +impl_as_prim_f32!(u64); +impl_as_prim_f32!(i8); +impl_as_prim_f32!(i16); +impl_as_prim_f32!(i32); +impl_as_prim_f32!(i64); +impl_as_prim_f32!(f32); +impl_as_prim_f32!(f64); + +pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + AsPrimF32 + Serialize + Unpin + Send + 'static { + fn zero() -> Self; +} + +macro_rules! impl_num_ops { + ($ty:ident, $zero:expr) => { + impl ScalarOps for $ty { + fn zero() -> Self { + $zero + } + } + }; +} + +impl_num_ops!(u8, 0); +impl_num_ops!(u16, 0); +impl_num_ops!(u32, 0); +impl_num_ops!(u64, 0); +impl_num_ops!(i8, 0); +impl_num_ops!(i16, 0); +impl_num_ops!(i32, 0); +impl_num_ops!(i64, 0); +impl_num_ops!(f32, 0.); +impl_num_ops!(f64, 0.); + +#[allow(unused)] +struct Ts(u64); + +struct Error { + #[allow(unused)] + kind: ErrorKind, +} + +impl From for Error { + fn from(kind: ErrorKind) -> Self { + Self { kind } + } +} + +enum ErrorKind { + #[allow(unused)] + MismatchedType, +} + +pub trait WithLen { + fn len(&self) -> usize; +} + +pub trait TimeSeries { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn ts_min_max(&self) -> Option<(u64, u64)>; +} + +pub enum Fits { + Empty, + Lower, + Greater, + Inside, + PartlyLower, + PartlyGreater, + PartlyLowerAndGreater, +} + +// TODO can this be removed? +pub trait FitsInside { + fn fits_inside(&self, range: NanoRange) -> Fits; +} + +impl FitsInside for T +where + T: TimeSeries, +{ + fn fits_inside(&self, range: NanoRange) -> Fits { + if let Some((min, max)) = self.ts_min_max() { + if max <= range.beg { + Fits::Lower + } else if min >= range.end { + Fits::Greater + } else if min < range.beg && max > range.end { + Fits::PartlyLowerAndGreater + } else if min < range.beg { + Fits::PartlyLower + } else if max > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } else { + Fits::Empty + } + } +} + +pub trait RangeOverlapInfo { + fn ends_before(&self, range: NanoRange) -> bool; + fn ends_after(&self, range: NanoRange) -> bool; + fn starts_after(&self, range: NanoRange) -> bool; +} + +impl RangeOverlapInfo for T +where + T: TimeSeries, +{ + fn ends_before(&self, range: NanoRange) -> bool { + if let Some(max) = self.ts_max() { + max <= range.beg + } else { + true + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + if let Some(max) = self.ts_max() { + max > range.end + } else { + true + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + if let Some(min) = self.ts_min() { + min >= range.end + } else { + true + } + } +} + +pub trait EmptyForScalarTypeShape { + fn empty(scalar_type: ScalarType, shape: Shape) -> Self; +} + +pub trait EmptyForShape { + fn empty(shape: Shape) -> Self; +} + +pub trait Empty { + fn empty() -> Self; +} + +pub trait AppendEmptyBin { + fn append_empty_bin(&mut self, ts1: u64, ts2: u64); +} + +#[derive(Clone, Debug, Deserialize)] +pub struct IsoDateTime(DateTime); + +impl Serialize for IsoDateTime { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) + } +} + +pub fn make_iso_ts(tss: &[u64]) -> Vec { + tss.iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect() +} + +pub trait TimeBinner: Send { + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option>; + fn ingest(&mut self, item: &dyn TimeBinnable); + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); +} + +/// Provides a time-binned representation of the implementing type. +/// In contrast to `TimeBinnableType` this is meant for trait objects. +pub trait TimeBinnable: WithLen + RangeOverlapInfo + Any + Send { + fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; + fn as_any(&self) -> &dyn Any; +} + +/// Container of some form of events, for use as trait object. +pub trait Events: Collectable + TimeBinnable { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; + fn verify(&self); + fn output_info(&self); + fn as_collectable_mut(&mut self) -> &mut dyn Collectable; +} + +/// Data in time-binned form. +pub trait TimeBinned: TimeBinnable { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; + fn edges_slice(&self) -> (&[u64], &[u64]); + fn counts(&self) -> &[u64]; + fn mins(&self) -> Vec; + fn maxs(&self) -> Vec; + fn avgs(&self) -> Vec; + fn validate(&self) -> Result<(), String>; +} + +pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo { + type Output: TimeBinnableType; + type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; + fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; +} + +pub trait TimeBinnableTypeAggregator: Send { + type Input: TimeBinnableType; + type Output: TimeBinnableType; + fn range(&self) -> &NanoRange; + fn ingest(&mut self, item: &Self::Input); + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; +} + +pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { + match shape { + Shape::Scalar => match agg_kind { + AggKind::TimeWeightedScalar => { + use ScalarType::*; + type K = eventsdim0::EventsDim0; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => { + error!("TODO empty_events_dyn"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_events_dyn"); + err::todoval() + } + }, + Shape::Wave(..) => { + error!("TODO empty_events_dyn"); + err::todoval() + } + Shape::Image(..) => { + error!("TODO empty_events_dyn"); + err::todoval() + } + } +} + +pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { + match shape { + Shape::Scalar => match agg_kind { + AggKind::TimeWeightedScalar => { + use ScalarType::*; + type K = binsdim0::MinMaxAvgDim0Bins; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => { + error!("TODO empty_binned_dyn"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_binned_dyn"); + err::todoval() + } + }, + Shape::Wave(_n) => match agg_kind { + AggKind::DimXBins1 => { + use ScalarType::*; + type K = binsdim0::MinMaxAvgDim0Bins; + match scalar_type { + U8 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => { + error!("TODO empty_binned_dyn"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_binned_dyn"); + err::todoval() + } + }, + Shape::Image(..) => { + error!("TODO empty_binned_dyn"); + err::todoval() + } + } +} + +pub enum ConnStatus {} + +pub struct ConnStatusEvent { + pub ts: u64, + pub status: ConnStatus, +} + +trait MergableEvents: Any { + fn len(&self) -> usize; + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn take_from(&mut self, src: &mut dyn MergableEvents, ts_end: u64) -> Result<(), Error>; +} + +pub enum ChannelEvents { + Events(Box), + Status(ConnStatusEvent), + RangeComplete, +} + +impl MergableEvents for ChannelEvents { + fn len(&self) -> usize { + error!("TODO MergableEvents"); + todo!() + } + + fn ts_min(&self) -> Option { + error!("TODO MergableEvents"); + todo!() + } + + fn ts_max(&self) -> Option { + error!("TODO MergableEvents"); + todo!() + } + + fn take_from(&mut self, _src: &mut dyn MergableEvents, _ts_end: u64) -> Result<(), Error> { + error!("TODO MergableEvents"); + todo!() + } +} + +struct ChannelEventsMerger { + _inp_1: Pin, Error>>>>, + _inp_2: Pin, Error>>>>, +} + +impl Stream for ChannelEventsMerger { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + //use Poll::*; + error!("TODO ChannelEventsMerger"); + err::todoval() + } +} + +// TODO do this with some blanket impl: +impl Collectable for Box { + fn new_collector(&self) -> Box { + Collectable::new_collector(self.as_ref()) + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + Collectable::as_any_mut(self.as_mut()) + } +} diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs new file mode 100644 index 0000000..2a6c511 --- /dev/null +++ b/items_2/src/streams.rs @@ -0,0 +1,77 @@ +use crate::WithLen; +use err::Error; +use serde::Serialize; +use std::any::Any; + +pub trait CollectorType: Send + Unpin + WithLen { + type Input: Collectable; + type Output: ToJsonResult + Serialize; + + fn ingest(&mut self, src: &mut Self::Input); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn result(&mut self) -> Result; +} + +pub trait Collector: Send + Unpin + WithLen { + fn ingest(&mut self, src: &mut dyn Collectable); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn result(&mut self) -> Result, Error>; +} + +pub trait CollectableType { + type Collector: CollectorType; + + fn new_collector() -> Self::Collector; +} + +pub trait Collectable: Any { + fn new_collector(&self) -> Box; + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +impl Collector for T { + fn ingest(&mut self, src: &mut dyn Collectable) { + let src: &mut ::Input = src.as_any_mut().downcast_mut().expect("can not downcast"); + T::ingest(self, src) + } + + fn set_range_complete(&mut self) { + T::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + T::set_timed_out(self) + } + + fn result(&mut self) -> Result, Error> { + let ret = T::result(self)?; + Ok(Box::new(ret) as _) + } +} + +impl Collectable for T { + fn new_collector(&self) -> Box { + Box::new(T::new_collector()) as _ + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + // TODO interesting: why exactly does returning `&mut self` not work here? + self + } +} + +pub trait ToJsonBytes { + fn to_json_bytes(&self) -> Result, Error>; +} + +pub trait ToJsonResult { + fn to_json_result(&self) -> Result, Error>; +} + +impl ToJsonBytes for serde_json::Value { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 37a3d1a..354ce8c 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -94,6 +94,182 @@ impl RawEventsQuery { } } +// TODO move this query type out of this `binned` mod +#[derive(Clone, Debug)] +pub struct PlainEventsQuery { + channel: Channel, + range: NanoRange, + disk_io_buffer_size: usize, + report_error: bool, + timeout: Duration, + events_max: Option, + do_log: bool, + do_test_main_error: bool, + do_test_stream_error: bool, +} + +impl PlainEventsQuery { + pub fn new( + channel: Channel, + range: NanoRange, + disk_io_buffer_size: usize, + events_max: Option, + do_log: bool, + ) -> Self { + Self { + channel, + range, + disk_io_buffer_size, + report_error: false, + timeout: Duration::from_millis(10000), + events_max, + do_log, + do_test_main_error: false, + do_test_stream_error: false, + } + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn range(&self) -> &NanoRange { + &self.range + } + + pub fn report_error(&self) -> bool { + self.report_error + } + + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } + + pub fn timeout(&self) -> Duration { + self.timeout + } + + pub fn events_max(&self) -> Option { + self.events_max + } + + pub fn do_log(&self) -> bool { + self.do_log + } + + pub fn do_test_main_error(&self) -> bool { + self.do_test_main_error + } + + pub fn do_test_stream_error(&self) -> bool { + self.do_test_stream_error + } + + pub fn set_series_id(&mut self, series: u64) { + self.channel.series = Some(series); + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = k; + } + + pub fn set_do_test_main_error(&mut self, k: bool) { + self.do_test_main_error = k; + } + + pub fn set_do_test_stream_error(&mut self, k: bool) { + self.do_test_stream_error = k; + } +} + +impl HasBackend for PlainEventsQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for PlainEventsQuery { + fn timeout(&self) -> Duration { + self.timeout.clone() + } +} + +impl FromUrl for PlainEventsQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; + let ret = Self { + channel: Channel::from_pairs(&pairs)?, + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, + report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, + timeout: pairs + .get("timeout") + .map_or("10000", |k| k) + .parse::() + .map(|k| Duration::from_millis(k)) + .map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?, + events_max: pairs + .get("eventsMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + do_log: pairs + .get("doLog") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, + do_test_main_error: pairs + .get("doTestMainError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, + do_test_stream_error: pairs + .get("doTestStreamError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, + }; + Ok(ret) + } +} + +impl AppendToUrl for PlainEventsQuery { + fn append_to_url(&self, url: &mut Url) { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + self.channel.append_to_url(url); + let mut g = url.query_pairs_mut(); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); + if let Some(x) = self.events_max.as_ref() { + g.append_pair("eventsMax", &format!("{}", x)); + } + g.append_pair("doLog", &format!("{}", self.do_log)); + } +} + #[derive(Clone, Debug)] pub struct BinnedQuery { channel: Channel, diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 04fb121..a259ad2 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -23,8 +23,8 @@ futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1.25" hex = "0.4.3" -scylla = "0.4.4" -tokio-postgres = "0.7.6" +scylla = "0.5" +tokio-postgres = "0.7.7" err = { path = "../err" } netpod = { path = "../netpod" } disk = { path = "../disk" } diff --git a/scyllaconn/Cargo.toml b/scyllaconn/Cargo.toml new file mode 100644 index 0000000..d393286 --- /dev/null +++ b/scyllaconn/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "scyllaconn" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/scyllaconn.rs" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.2" +erased-serde = "0.3" +tokio = { version = "1.20.1", default-features = false, features = ["time", "sync"] } +byteorder = "1.4.3" +bytes = "1.2.1" +num-traits = "0.2.15" +chrono = { version = "0.4.19", features = ["serde"] } +crc32fast = "1.3.2" +futures-util = "0.3.24" +async-channel = "1.7.1" +scylla = "0.5" +tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } +err = { path = "../err" } +netpod = { path = "../netpod" } +items_2 = { path = "../items_2" } diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs new file mode 100644 index 0000000..3c2cd10 --- /dev/null +++ b/scyllaconn/src/bincache.rs @@ -0,0 +1,468 @@ +use crate::errconv::ErrConv; +use crate::events::EventsStreamScylla; +use err::Error; +use futures_util::{Future, Stream, StreamExt}; +use items_2::binsdim0::MinMaxAvgDim0Bins; +use items_2::{empty_binned_dyn, empty_events_dyn, ChannelEvents, TimeBinned}; +use netpod::log::*; +use netpod::query::{CacheUsage, PlainEventsQuery, RawEventsQuery}; +use netpod::timeunits::*; +use netpod::{AggKind, ChannelTyped, ScalarType, Shape}; +use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange}; +use scylla::Session as ScySession; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +pub async fn read_cached_scylla( + series: u64, + chn: &ChannelTyped, + coord: &PreBinnedPatchCoord, + _agg_kind: AggKind, + scy: &ScySession, +) -> Result>, Error> { + let vals = ( + series as i64, + (coord.bin_t_len() / SEC) as i32, + (coord.patch_t_len() / SEC) as i32, + coord.ix() as i64, + ); + let res = scy + .query_iter( + "select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?", + vals, + ) + .await; + let mut res = res.err_conv().map_err(|e| { + error!("can not read from cache"); + e + })?; + while let Some(item) = res.next().await { + let row = item.err_conv()?; + let edges = coord.edges(); + let (counts, avgs, mins, maxs): (Vec, Vec, Vec, Vec) = row.into_typed().err_conv()?; + let mut counts_mismatch = false; + if edges.len() != counts.len() + 1 { + counts_mismatch = true; + } + if counts.len() != avgs.len() { + counts_mismatch = true; + } + let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec(); + let ts2s = edges[1.min(edges.len())..].to_vec(); + if ts1s.len() != ts2s.len() { + error!("ts1s vs ts2s mismatch"); + counts_mismatch = true; + } + if ts1s.len() != counts.len() { + counts_mismatch = true; + } + let avgs = avgs.into_iter().map(|x| x).collect::>(); + let mins = mins.into_iter().map(|x| x as _).collect::>(); + let maxs = maxs.into_iter().map(|x| x as _).collect::>(); + if counts_mismatch { + error!( + "mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}", + edges.len(), + ts1s.len(), + ts2s.len(), + counts.len(), + avgs.len(), + mins.len(), + maxs.len(), + ); + } + let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect(); + // TODO construct a dyn TimeBinned using the scalar type and shape information. + // TODO place the values with little copying into the TimeBinned. + use ScalarType::*; + use Shape::*; + match &chn.shape { + Scalar => match &chn.scalar_type { + F64 => { + let ret = MinMaxAvgDim0Bins:: { + ts1s, + ts2s, + counts, + avgs, + mins, + maxs, + }; + return Ok(Some(Box::new(ret))); + } + _ => { + error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); + err::todoval() + } + }, + _ => { + error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); + err::todoval() + } + } + } + Ok(None) +} + +#[allow(unused)] +struct WriteFut<'a> { + chn: &'a ChannelTyped, + coord: &'a PreBinnedPatchCoord, + data: &'a dyn TimeBinned, + scy: &'a ScySession, +} + +impl<'a> WriteFut<'a> { + #[allow(unused)] + fn new( + chn: &'a ChannelTyped, + coord: &'a PreBinnedPatchCoord, + data: &'a dyn TimeBinned, + scy: &'a ScySession, + ) -> Self { + Self { chn, coord, data, scy } + } +} + +impl<'a> Future for WriteFut<'a> { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let _ = cx; + Poll::Ready(Ok(())) + } +} + +pub fn write_cached_scylla<'a>( + series: u64, + _chn: &'a ChannelTyped, + coord: &'a PreBinnedPatchCoord, + //data: &'a dyn TimeBinned, + data: Box, + //scy: &'a ScySession, + _scy: Arc, +) -> Pin> + Send + 'a>> { + //let _chn = unsafe { &*(chn as *const ChannelTyped) }; + //let data_ptr = data as *const dyn TimeBinned as usize; + //let scy_ptr = scy as *const ScySession as usize; + let fut = async move { + //let data = unsafe { &*(data_ptr as *const dyn TimeBinned) }; + //let scy = unsafe { &*(scy_ptr as *const ScySession) }; + let bin_len_sec = (coord.bin_t_len() / SEC) as i32; + let patch_len_sec = (coord.patch_t_len() / SEC) as i32; + let offset = coord.ix(); + warn!( + "write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}", + data.counts().len(), + series, + bin_len_sec, + patch_len_sec, + offset, + ); + let _data2 = data.counts().iter().map(|x| *x as i64).collect::>(); + /* + let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?; + scy.execute( + &stmt, + ( + series as i64, + bin_len_sec, + patch_len_sec, + offset as i64, + data2, + data.avgs(), + data.mins(), + data.maxs(), + ), + ) + .await + .err_conv() + .map_err(|e| { + error!("can not write to cache"); + e + })?; + */ + Ok(()) + }; + Box::pin(fut) +} + +pub async fn fetch_uncached_data( + series: u64, + chn: ChannelTyped, + coord: PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, + scy: Arc, +) -> Result, bool)>, Error> { + info!("fetch_uncached_data {coord:?}"); + // Try to find a higher resolution pre-binned grid which covers the requested patch. + let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) { + Ok(Some(range)) => { + if coord.patch_range() != range.range() { + error!( + "The chosen covering range does not exactly cover the requested patch {:?} vs {:?}", + coord.patch_range(), + range.range() + ); + } + fetch_uncached_higher_res_prebinned( + series, + &chn, + coord.clone(), + range, + agg_kind, + cache_usage.clone(), + scy.clone(), + ) + .await + } + Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await, + Err(e) => Err(e), + }?; + if true || complete { + let edges = coord.edges(); + if edges.len() < bin.len() + 1 { + error!( + "attempt to write overfull bin to cache edges {} bin {}", + edges.len(), + bin.len() + ); + return Err(Error::with_msg_no_trace(format!( + "attempt to write overfull bin to cache" + ))); + } else if edges.len() > bin.len() + 1 { + let missing = edges.len() - bin.len() - 1; + error!("attempt to write incomplete bin to cache missing {missing}"); + } + if let CacheUsage::Use | CacheUsage::Recreate = &cache_usage { + // TODO pass data in safe way. + let _data = bin.as_ref(); + //let fut = WriteFut::new(&chn, &coord, err::todoval(), &scy); + //fut.await?; + //write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?; + } + } + Ok(Some((bin, complete))) +} + +pub fn fetch_uncached_data_box( + series: u64, + chn: &ChannelTyped, + coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, + scy: Arc, +) -> Pin, bool)>, Error>> + Send>> { + Box::pin(fetch_uncached_data( + series, + chn.clone(), + coord.clone(), + agg_kind, + cache_usage, + scy, + )) +} + +pub async fn fetch_uncached_higher_res_prebinned( + series: u64, + chn: &ChannelTyped, + coord: PreBinnedPatchCoord, + range: PreBinnedPatchRange, + agg_kind: AggKind, + cache_usage: CacheUsage, + scy: Arc, +) -> Result<(Box, bool), Error> { + let edges = coord.edges(); + // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. + let do_time_weight = true; + // We must produce some result with correct types even if upstream delivers nothing at all. + let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind); + let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); + let mut complete = true; + let patch_it = PreBinnedPatchIterator::from_range(range.clone()); + for patch_coord in patch_it { + // We request data here for a Coord, meaning that we expect to receive multiple bins. + // The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord. + //let patch_coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); + let (bin, comp) = pre_binned_value_stream_with_scy( + series, + chn, + &patch_coord, + agg_kind.clone(), + cache_usage.clone(), + scy.clone(), + ) + .await?; + if let Err(msg) = bin.validate() { + error!( + "pre-binned intermediate issue {} coord {:?} patch_coord {:?}", + msg, coord, patch_coord + ); + } + complete = complete && comp; + time_binner.ingest(bin.as_time_binnable_dyn()); + } + // Fixed limit to defend against a malformed implementation: + let mut i = 0; + while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { + let n1 = time_binner.bins_ready_count(); + if false { + trace!( + "pre-binned extra cycle {} {} {}", + i, + time_binner.bins_ready_count(), + coord.bin_count() + ); + } + time_binner.cycle(); + i += 1; + if time_binner.bins_ready_count() <= n1 { + warn!("pre-binned cycle did not add another bin, break"); + break; + } + } + if time_binner.bins_ready_count() < coord.bin_count() as usize { + return Err(Error::with_msg_no_trace(format!( + "pre-binned unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", + time_binner.bins_ready_count(), + coord.bin_count(), + edges.len(), + ))); + } + let ready = time_binner + .bins_ready() + .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?; + if let Err(msg) = ready.validate() { + error!("pre-binned final issue {} coord {:?}", msg, coord); + } + Ok((ready, complete)) +} + +pub async fn fetch_uncached_binned_events( + series: u64, + chn: &ChannelTyped, + coord: PreBinnedPatchCoord, + agg_kind: AggKind, + scy: Arc, +) -> Result<(Box, bool), Error> { + let edges = coord.edges(); + // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. + let do_time_weight = true; + // We must produce some result with correct types even if upstream delivers nothing at all. + let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); + let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); + let deadline = Instant::now(); + let deadline = deadline + .checked_add(Duration::from_millis(6000)) + .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; + let _evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind); + let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range(), 4096, None, true); + let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); + let mut complete = false; + loop { + let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await; + let item = match item { + Ok(Some(k)) => k, + Ok(None) => break, + Err(_) => { + error!("fetch_uncached_binned_events timeout"); + return Err(Error::with_msg_no_trace(format!( + "TODO handle fetch_uncached_binned_events timeout" + ))); + } + }; + match item { + Ok(ChannelEvents::Events(item)) => { + time_binner.ingest(item.as_time_binnable_dyn()); + // TODO could also ask the binner here whether we are "complete" to stop sending useless data. + } + Ok(ChannelEvents::Status(_)) => { + // TODO flag, should not happen. + return Err(Error::with_msg_no_trace(format!( + "unexpected read of channel status events" + ))); + } + Ok(ChannelEvents::RangeComplete) => { + complete = true; + } + Err(e) => return Err(e), + } + } + // Fixed limit to defend against a malformed implementation: + let mut i = 0; + while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { + let n1 = time_binner.bins_ready_count(); + if false { + trace!( + "events extra cycle {} {} {}", + i, + time_binner.bins_ready_count(), + coord.bin_count() + ); + } + time_binner.cycle(); + i += 1; + if time_binner.bins_ready_count() <= n1 { + warn!("events cycle did not add another bin, break"); + break; + } + } + if time_binner.bins_ready_count() < coord.bin_count() as usize { + return Err(Error::with_msg_no_trace(format!( + "events unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", + time_binner.bins_ready_count(), + coord.bin_count(), + edges.len(), + ))); + } + let ready = time_binner + .bins_ready() + .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?; + if let Err(msg) = ready.validate() { + error!("time binned invalid {} coord {:?}", msg, coord); + } + Ok((ready, complete)) +} + +pub async fn pre_binned_value_stream_with_scy( + series: u64, + chn: &ChannelTyped, + coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, + scy: Arc, +) -> Result<(Box, bool), Error> { + trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); + if let (Some(item), CacheUsage::Use) = ( + read_cached_scylla(series, chn, coord, agg_kind.clone(), &scy).await?, + &cache_usage, + ) { + info!("+++++++++++++ GOOD READ"); + Ok((item, true)) + } else { + if let CacheUsage::Use = &cache_usage { + warn!("--+--+--+--+--+--+ NOT YET CACHED"); + } + let res = fetch_uncached_data_box(series, chn, coord, agg_kind, cache_usage, scy).await?; + let (bin, complete) = + res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?; + Ok((bin, complete)) + } +} + +pub async fn pre_binned_value_stream( + series: u64, + chn: &ChannelTyped, + coord: &PreBinnedPatchCoord, + agg_kind: AggKind, + cache_usage: CacheUsage, + scy: Arc, +) -> Result, Error>> + Send>>, Error> { + trace!("pre_binned_value_stream series {series} {chn:?} {coord:?}"); + let res = pre_binned_value_stream_with_scy(series, chn, coord, agg_kind, cache_usage, scy).await?; + error!("TODO pre_binned_value_stream"); + err::todo(); + Ok(Box::pin(futures_util::stream::iter([Ok(res.0)]))) +} diff --git a/scyllaconn/src/errconv.rs b/scyllaconn/src/errconv.rs new file mode 100644 index 0000000..887f554 --- /dev/null +++ b/scyllaconn/src/errconv.rs @@ -0,0 +1,51 @@ +use err::Error; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; + +pub trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg(e.to_string())), + } + } +} + +impl ErrConv for Result> { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg(e.to_string())), + } + } +} +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs new file mode 100644 index 0000000..9055dab --- /dev/null +++ b/scyllaconn/src/events.rs @@ -0,0 +1,606 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::{Future, FutureExt, Stream, StreamExt}; +use items_2::eventsdim0::EventsDim0; +use items_2::{ChannelEvents, Empty, Events}; +use netpod::log::*; +use netpod::query::{ChannelStateEvents, PlainEventsQuery}; +use netpod::timeunits::*; +use netpod::{Channel, Database, NanoRange, ScalarType, Shape}; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio_postgres::Client as PgClient; + +macro_rules! read_values { + ($fname:ident, $self:expr, $ts_msp:expr) => {{ + let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); + let fut = fut.map(|x| { + match x { + Ok(k) => { + // TODO why static needed? + //let b = Box::new(k) as Box; + let b = Box::new(k) as Box; + Ok(b) + } + Err(e) => Err(e), + } + }); + let fut = Box::pin(fut) as Pin, Error>> + Send>>; + fut + }}; +} + +struct ReadValues { + series: i64, + scalar_type: ScalarType, + shape: Shape, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + fut: Pin, Error>> + Send>>, + scy: Arc, +} + +impl ReadValues { + fn new( + series: i64, + scalar_type: ScalarType, + shape: Shape, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + scy: Arc, + ) -> Self { + let mut ret = Self { + series, + scalar_type, + shape, + range, + ts_msps, + fwd, + fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( + "future not initialized", + )))), + scy, + }; + ret.next(); + ret + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msps.pop_front() { + self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 1); + true + } else { + false + } + } + + fn make_fut( + &mut self, + ts_msp: u64, + _has_more_msp: bool, + ) -> Pin, Error>> + Send>> { + let fut = match &self.shape { + Shape::Scalar => match &self.scalar_type { + ScalarType::I8 => { + read_values!(read_next_values_scalar_i8, self, ts_msp) + } + ScalarType::I16 => { + read_values!(read_next_values_scalar_i16, self, ts_msp) + } + ScalarType::I32 => { + read_values!(read_next_values_scalar_i32, self, ts_msp) + } + ScalarType::F32 => { + read_values!(read_next_values_scalar_f32, self, ts_msp) + } + ScalarType::F64 => { + read_values!(read_next_values_scalar_f64, self, ts_msp) + } + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }, + Shape::Wave(_) => match &self.scalar_type { + ScalarType::U16 => { + read_values!(read_next_values_array_u16, self, ts_msp) + } + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }, + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }; + fut + } +} + +enum FrState { + New, + FindMsp(Pin, Error>> + Send>>), + ReadBack1(ReadValues), + ReadBack2(ReadValues), + ReadValues(ReadValues), + Done, +} + +pub struct EventsStreamScylla { + state: FrState, + series: u64, + scalar_type: ScalarType, + shape: Shape, + range: NanoRange, + ts_msps: VecDeque, + scy: Arc, + do_test_stream_error: bool, +} + +impl EventsStreamScylla { + pub fn new( + series: u64, + evq: &PlainEventsQuery, + scalar_type: ScalarType, + shape: Shape, + scy: Arc, + do_test_stream_error: bool, + ) -> Self { + Self { + state: FrState::New, + series, + scalar_type, + shape, + range: evq.range().clone(), + ts_msps: VecDeque::new(), + scy, + do_test_stream_error, + } + } + + fn ts_msps_found(&mut self, ts_msps: VecDeque) { + info!("found ts_msps {ts_msps:?}"); + self.ts_msps = ts_msps; + // Find the largest MSP which can potentially contain some event before the range. + let befores: Vec<_> = self + .ts_msps + .iter() + .map(|x| *x) + .filter(|x| *x < self.range.beg) + .collect(); + if befores.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + [befores[befores.len() - 1]].into(), + false, + self.scy.clone(), + ); + self.state = FrState::ReadBack1(st); + } else if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + } + + fn back_1_done(&mut self, item: Box) -> Option> { + info!("back_1_done len {}", item.len()); + if item.len() == 0 { + // Find the 2nd largest MSP which can potentially contain some event before the range. + let befores: Vec<_> = self + .ts_msps + .iter() + .map(|x| *x) + .filter(|x| *x < self.range.beg) + .collect(); + if befores.len() >= 2 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + [befores[befores.len() - 2]].into(), + false, + self.scy.clone(), + ); + self.state = FrState::ReadBack2(st); + None + } else if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + None + } else { + self.state = FrState::Done; + None + } + } else { + if self.ts_msps.len() > 0 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + Some(item) + } else { + self.state = FrState::Done; + Some(item) + } + } + } + + fn back_2_done(&mut self, item: Box) -> Option> { + info!("back_2_done len {}", item.len()); + if self.ts_msps.len() >= 1 { + let st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + self.ts_msps.clone(), + true, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + if item.len() > 0 { + Some(item) + } else { + None + } + } +} + +impl Stream for EventsStreamScylla { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.do_test_stream_error { + let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) + .add_public_msg(format!("Test PUBLIC STREAM error.")); + return Ready(Some(Err(e))); + } + loop { + break match self.state { + FrState::New => { + let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); + let fut = Box::pin(fut); + self.state = FrState::FindMsp(fut); + continue; + } + FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok(ts_msps)) => { + self.ts_msps_found(ts_msps); + continue; + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + if let Some(item) = self.back_1_done(item) { + item.verify(); + item.output_info(); + Ready(Some(Ok(ChannelEvents::Events(item)))) + } else { + continue; + } + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + if let Some(item) = self.back_2_done(item) { + item.verify(); + item.output_info(); + Ready(Some(Ok(ChannelEvents::Events(item)))) + } else { + continue; + } + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + info!("read values"); + item.verify(); + item.output_info(); + if !st.next() { + info!("ReadValues exhausted"); + self.state = FrState::Done; + } + Ready(Some(Ok(ChannelEvents::Events(item)))) + } + Ready(Err(e)) => Ready(Some(Err(e))), + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} + +async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { + info!("find_series channel {:?}", channel); + let rows = if let Some(series) = channel.series() { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; + pgclient.query(q, &[&(series as i64)]).await.err_conv()? + } else { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; + pgclient + .query(q, &[&channel.backend(), &channel.name()]) + .await + .err_conv()? + }; + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace(format!( + "No series found for {channel:?}" + ))); + } + if rows.len() > 1 { + error!("Multiple series found for {channel:?}"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; + let series = row.get::<_, i64>(0) as u64; + let _facility: String = row.get(1); + let _channel: String = row.get(2); + let a: i32 = row.get(3); + let scalar_type = ScalarType::from_scylla_i32(a)?; + let a: Vec = row.get(4); + let shape = Shape::from_scylla_shape_dims(&a)?; + Ok((series, scalar_type, shape)) +} + +async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { + trace!("find_ts_msp series {} {:?}", series, range); + // TODO use prepared statements + let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; + let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; + let mut before = VecDeque::new(); + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + before.push_front(row.0 as u64); + } + let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + let mut ret = VecDeque::new(); + for h in before { + ret.push_back(h); + } + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push_back(row.0 as u64); + } + trace!("found in total {} rows", ret.len()); + Ok(ret) +} + +macro_rules! read_next_scalar_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + range: NanoRange, + fwd: bool, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + if ts_msp >= range.end { + warn!("given ts_msp {} >= range.end {}", ts_msp, range.end); + } + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let res = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) + .await + .err_conv()? + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + info!( + "BCK ts_msp {} ts_lsp_max {} range beg {} end {} {}", + ts_msp, + ts_lsp_max, + range.beg, + range.end, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()? + }; + let mut ret = EventsDim0::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + ret.push(ts, pulse, value); + } + trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +macro_rules! read_next_array_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + _range: NanoRange, + _fwd: bool, + scy: Arc, + ) -> Result, Error> { + // TODO change return type: so far EventsDim1 does not exist. + error!("TODO read_next_array_values"); + err::todo(); + if true { + return Err(Error::with_msg_no_trace("redo based on scalar case")); + } + type ST = $st; + type _SCYTY = $scyty; + info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ?" + ); + let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; + let ret = EventsDim0::::empty(); + /* + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2.into_iter().map(|x| x as ST).collect(); + ret.push(ts, pulse, value); + } + */ + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); +read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); +read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); +read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); +read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); + +read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); + +pub async fn make_scylla_stream( + evq: &PlainEventsQuery, + scy: Arc, + pgclient: &Arc, + do_test_stream_error: bool, +) -> Result> + Send>>, Error> { + // Need to know details about the series in order to fetch data: + // TODO should query already contain ScalarType and Shape? + let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; + let res = Box::pin(EventsStreamScylla::new( + series, + evq, + scalar_type, + shape, + scy, + do_test_stream_error, + )) as _; + Ok(res) +} + +pub async fn channel_state_events( + evq: &ChannelStateEvents, + scy: Arc, + _dbconf: Database, +) -> Result, Error> { + let mut ret = Vec::new(); + let div = DAY; + let mut ts_msp = evq.range().beg / div * div; + loop { + let series = (evq + .channel() + .series() + .ok_or(Error::with_msg_no_trace(format!("series id not given"))))?; + let params = (series as i64, ts_msp as i64); + let mut res = scy + .query_iter( + "select ts_lsp, kind from channel_status where series = ? and ts_msp = ?", + params, + ) + .await + .err_conv()?; + while let Some(row) = res.next().await { + let row = row.err_conv()?; + let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?; + let ts = ts_msp + ts_lsp as u64; + let kind = kind as u32; + if ts >= evq.range().beg && ts < evq.range().end { + ret.push((ts, kind)); + } + } + ts_msp += DAY; + if ts_msp >= evq.range().end { + break; + } + } + Ok(ret) +} diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs new file mode 100644 index 0000000..6493f60 --- /dev/null +++ b/scyllaconn/src/scyllaconn.rs @@ -0,0 +1,20 @@ +pub mod bincache; +pub mod errconv; +pub mod events; + +use err::Error; +use errconv::ErrConv; +use netpod::ScyllaConfig; +use scylla::Session as ScySession; +use std::sync::Arc; + +pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, Error> { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) + .build() + .await + .err_conv()?; + let ret = Arc::new(scy); + Ok(ret) +}