Refactor (WIP) the event fetch pipeline

This commit is contained in:
Dominik Werder
2022-09-01 18:53:00 +02:00
parent 904faeffa3
commit 4a3f8986fe
28 changed files with 3239 additions and 239 deletions

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["daqbuffer", "h5out", "items", "items_proc", "nodenet", "httpclient", "fsio", "dq"]
members = ["daqbuffer", "httpret", "h5out", "items", "items_2", "items_proc", "nodenet", "httpclient", "fsio", "dq"]
[profile.release]
opt-level = 1

View File

@@ -3,12 +3,11 @@ mod channelarchiver;
use crate::err::ErrConv;
use crate::nodes::{require_sls_test_host_running, require_test_hosts_running};
use chrono::{DateTime, Utc};
use disk::events::PlainEventsQuery;
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::query::{BinnedQuery, CacheUsage};
use netpod::query::{BinnedQuery, CacheUsage, PlainEventsQuery};
use netpod::{f64_close, AppendToUrl};
use netpod::{AggKind, Channel, Cluster, NanoRange, APP_JSON};
use serde::{Deserialize, Serialize};

View File

@@ -1,7 +1,6 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use chrono::{DateTime, Utc};
use disk::events::PlainEventsQuery;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::streamlog::Streamlog;
use err::Error;
@@ -12,6 +11,7 @@ use items::numops::NumOps;
use items::scalarevents::ScalarEvents;
use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen};
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET};
use serde_json::Value as JsonValue;
use std::fmt::Debug;

View File

@@ -10,19 +10,19 @@ path = "src/dbconn.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
tracing = "0.1.25"
crc32fast = "1.2.1"
arrayref = "0.3.6"
byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"
bytes = "1.0.1"
pin-project = "1.0.7"
byteorder = "1.4"
futures-core = "0.3.24"
futures-util = "0.3.24"
bytes = "1.2"
pin-project = "1"
#async-channel = "1"
#dashmap = "3"
tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
scylla = "0.4.4"
scylla = "0.5"
async-channel = "1.6"
chrono = "0.4"
regex = "1.5.4"

View File

@@ -3,7 +3,6 @@ use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
LittleEndian, NumFromBytes,
};
use crate::events::PlainEventsQuery;
use crate::merge::mergedfromremotes::MergedFromRemotes;
use bytes::Bytes;
use err::Error;
@@ -18,7 +17,7 @@ use items::{
TimeBinnableType,
};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::query::{PlainEventsQuery, RawEventsQuery};
use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;

View File

@@ -9,7 +9,6 @@ pub mod dataopen;
pub mod decode;
pub mod eventblobs;
pub mod eventchunker;
pub mod events;
pub mod frame;
pub mod gen;
pub mod index;

View File

@@ -1,188 +0,0 @@
use chrono::{DateTime, TimeZone, Utc};
use err::Error;
use netpod::get_url_query_pairs;
use netpod::{AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos};
use std::time::Duration;
use url::Url;
// TODO move this query type out of this `binned` mod
#[derive(Clone, Debug)]
pub struct PlainEventsQuery {
channel: Channel,
range: NanoRange,
disk_io_buffer_size: usize,
report_error: bool,
timeout: Duration,
events_max: Option<u64>,
do_log: bool,
do_test_main_error: bool,
do_test_stream_error: bool,
}
impl PlainEventsQuery {
pub fn new(
channel: Channel,
range: NanoRange,
disk_io_buffer_size: usize,
events_max: Option<u64>,
do_log: bool,
) -> Self {
Self {
channel,
range,
disk_io_buffer_size,
report_error: false,
timeout: Duration::from_millis(10000),
events_max,
do_log,
do_test_main_error: false,
do_test_stream_error: false,
}
}
pub fn from_request_head(head: &http::request::Parts) -> Result<Self, Error> {
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
Self::from_url(&url)
}
pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn range(&self) -> &NanoRange {
&self.range
}
pub fn report_error(&self) -> bool {
self.report_error
}
pub fn disk_io_buffer_size(&self) -> usize {
self.disk_io_buffer_size
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn events_max(&self) -> Option<u64> {
self.events_max
}
pub fn do_log(&self) -> bool {
self.do_log
}
pub fn do_test_main_error(&self) -> bool {
self.do_test_main_error
}
pub fn do_test_stream_error(&self) -> bool {
self.do_test_stream_error
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
}
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
pub fn set_do_test_main_error(&mut self, k: bool) {
self.do_test_main_error = k;
}
pub fn set_do_test_stream_error(&mut self, k: bool) {
self.do_test_stream_error = k;
}
}
impl HasBackend for PlainEventsQuery {
fn backend(&self) -> &str {
&self.channel.backend
}
}
impl HasTimeout for PlainEventsQuery {
fn timeout(&self) -> Duration {
self.timeout.clone()
}
}
impl FromUrl for PlainEventsQuery {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, Error> {
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
let ret = Self {
channel: Channel::from_pairs(&pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
disk_io_buffer_size: pairs
.get("diskIoBufferSize")
.map_or("4096", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
report_error: pairs
.get("reportError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?,
timeout: pairs
.get("timeout")
.map_or("10000", |k| k)
.parse::<u64>()
.map(|k| Duration::from_millis(k))
.map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?,
events_max: pairs
.get("eventsMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
do_log: pairs
.get("doLog")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?,
do_test_main_error: pairs
.get("doTestMainError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?,
do_test_stream_error: pairs
.get("doTestStreamError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?,
};
Ok(ret)
}
}
impl AppendToUrl for PlainEventsQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
self.channel.append_to_url(url);
let mut g = url.query_pairs_mut();
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
if let Some(x) = self.events_max.as_ref() {
g.append_pair("eventsMax", &format!("{}", x));
}
g.append_pair("doLog", &format!("{}", self.do_log));
}
}

View File

@@ -29,10 +29,12 @@ dbconn = { path = "../dbconn" }
tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
disk = { path = "../disk" }
items = { path = "../items" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
nodenet = { path = "../nodenet" }
commonio = { path = "../commonio" }
taskrun = { path = "../taskrun" }
scylla = "0.4"
scyllaconn = { path = "../scyllaconn" }
scylla = "0.5"
md-5 = "0.9"
regex = "1.6"

View File

@@ -2,12 +2,11 @@ use crate::err::Error;
use crate::{response, ToPublicResponse};
use dbconn::{create_connection, create_scylla_connection};
use disk::binned::query::PreBinnedQuery;
use disk::events::PlainEventsQuery;
use futures_util::StreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::query::{BinnedQuery, PlainEventsQuery};
use netpod::timeunits::*;
use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape};
use netpod::{ChannelConfigResponse, NodeConfigCached};

View File

@@ -1,13 +1,19 @@
use std::sync::Arc;
use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json};
use crate::err::Error;
use crate::{response, response_err, BodyStream, ToPublicResponse};
use disk::events::PlainEventsQuery;
use futures_util::{StreamExt, TryStreamExt};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use items_2::ChannelEvents;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::{AggKind, FromUrl, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
use scyllaconn::create_scy_session;
use scyllaconn::errconv::ErrConv;
use scyllaconn::events::make_scylla_stream;
use url::Url;
pub struct EventsHandler {}
@@ -83,7 +89,9 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("httpret plain_events_json req: {:?}", req);
let (head, _body) = req.into_parts();
let query = PlainEventsQuery::from_request_head(&head)?;
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
let query = PlainEventsQuery::from_url(&url)?;
let chconf = chconf_from_events_json(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
@@ -118,3 +126,103 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
))?;
Ok(ret)
}
pub struct EventsHandlerScylla {}
impl EventsHandlerScylla {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/events" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
match self.fetch(req, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => Ok(e.to_public_response()),
}
}
async fn fetch(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("EventsHandlerScylla req: {:?}", req);
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(self.gather(req, node_config).await?)
} else {
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
Ok(ret)
}
}
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
warn!("TODO PlainEventsQuery needs to take AggKind");
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
let evq = PlainEventsQuery::from_url(&url)?;
let pgclient = {
// TODO use common connection/pool:
info!("--------------- open postgres connection");
let pgconf = &node_config.node_config.cluster.database;
let u = {
let d = &pgconf;
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
};
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
tokio::spawn(pgconn);
let pgclient = Arc::new(pgclient);
pgclient
};
let mut stream = if let Some(scyco) = &node_config.node_config.cluster.scylla {
let scy = create_scy_session(scyco).await?;
let stream = make_scylla_stream(&evq, scy, &pgclient, false).await?;
stream
} else {
return Err(Error::with_public_msg(format!("no scylla configured")));
};
let mut coll = None;
while let Some(item) = stream.next().await {
match item {
Ok(k) => match k {
ChannelEvents::Events(mut item) => {
if coll.is_none() {
coll = Some(item.new_collector());
}
let cl = coll.as_mut().unwrap();
cl.ingest(item.as_collectable_mut());
}
ChannelEvents::Status(..) => {}
ChannelEvents::RangeComplete => {}
},
Err(e) => {
return Err(e.into());
}
}
}
match coll {
Some(mut coll) => {
let res = coll.result()?;
let res = res.to_json_result()?;
let res = res.to_json_bytes()?;
let ret = response(StatusCode::OK).body(Body::from(res))?;
Ok(ret)
}
None => {
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
futures_util::stream::iter([Ok(Vec::new())]),
format!("EventsHandlerScylla::gather"),
))?;
Ok(ret)
}
}
}
}

View File

@@ -9,7 +9,6 @@ use disk::decode::Endianness;
use disk::decode::EventValueFromBytes;
use disk::decode::EventValueShape;
use disk::decode::NumFromBytes;
use disk::events::PlainEventsQuery;
use disk::merge::mergedfromremotes::MergedFromRemotes;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -26,15 +25,8 @@ use items::PushableIndex;
use items::Sitemty;
use items::TimeBinnableType;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::AggKind;
use netpod::Channel;
use netpod::FromUrl;
use netpod::NanoRange;
use netpod::NodeConfigCached;
use netpod::PerfOpts;
use netpod::ScalarType;
use netpod::Shape;
use netpod::query::{PlainEventsQuery, RawEventsQuery};
use netpod::{AggKind, Channel, FromUrl, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::pin::Pin;

View File

@@ -245,6 +245,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandlerScylla::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {

View File

@@ -5,7 +5,6 @@ use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes};
use crate::pulsemap::MapPulseQuery;
use crate::{api_1_docs, api_4_docs, response, response_err, Cont};
use disk::events::PlainEventsQuery;
use futures_core::Stream;
use futures_util::pin_mut;
use http::{Method, StatusCode};
@@ -14,7 +13,7 @@ use hyper::{Body, Request, Response, Server};
use hyper_tls::HttpsConnector;
use itertools::Itertools;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::query::{BinnedQuery, PlainEventsQuery};
use netpod::{
AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl,
HasBackend, HasTimeout, ProxyConfig, ACCEPT_ALL, APP_JSON,

View File

@@ -13,11 +13,11 @@ serde_json = "1.0"
serde_cbor = "0.11.1"
erased-serde = "0.3"
bincode = "1.3.3"
bytes = "1.0.1"
num-traits = "0.2.14"
tokio = { version = "1.7.1", features = ["fs"] }
chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.2.1"
bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.22", features = ["serde"] }
crc32fast = "1.3.2"
tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
err = { path = "../err" }
items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" }

View File

@@ -1,11 +0,0 @@
pub enum ConnStatus {}
pub struct ConnStatusEvent {
ts: u64,
status: ConnStatus,
}
pub enum ChannelEvents {
Status(ConnStatus),
Data(),
}

View File

@@ -1,5 +1,4 @@
pub mod binnedevents;
pub mod binnernew;
pub mod binsdim0;
pub mod binsdim1;
pub mod eventsitem;

22
items_2/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "items_2"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/items_2.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"
erased-serde = "0.3"
bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.3.2"
futures-util = "0.3.24"
err = { path = "../err" }
items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" }

538
items_2/src/binsdim0.rs Normal file
View File

@@ -0,0 +1,538 @@
use crate::streams::{CollectableType, CollectorType, ToJsonResult};
use crate::{
ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, ScalarOps, TimeBinnable, TimeBinnableType,
TimeBinnableTypeAggregator, TimeBinned, TimeBinner, TimeSeries, WithLen,
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::{fmt, mem};
#[derive(Clone, Serialize, Deserialize)]
pub struct MinMaxAvgDim0Bins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
pub counts: Vec<u64>,
pub mins: Vec<NTY>,
pub maxs: Vec<NTY>,
pub avgs: Vec<f32>,
}
impl<NTY> fmt::Debug for MinMaxAvgDim0Bins<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = std::any::type_name::<Self>();
write!(
fmt,
"{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl<NTY> MinMaxAvgDim0Bins<NTY> {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> WithLen for MinMaxAvgDim0Bins<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<NTY> Empty for MinMaxAvgDim0Bins<NTY> {
fn empty() -> Self {
Self {
ts1s: Vec::new(),
ts2s: Vec::new(),
counts: Vec::new(),
mins: Vec::new(),
maxs: Vec::new(),
avgs: Vec::new(),
}
}
}
impl<NTY: ScalarOps> AppendEmptyBin for MinMaxAvgDim0Bins<NTY> {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
self.ts1s.push(ts1);
self.ts2s.push(ts2);
self.counts.push(0);
self.mins.push(NTY::zero());
self.maxs.push(NTY::zero());
self.avgs.push(0.);
}
}
impl<NTY: ScalarOps> TimeSeries for MinMaxAvgDim0Bins<NTY> {
fn ts_min(&self) -> Option<u64> {
todo!("collection of bins can not be TimeSeries")
}
fn ts_max(&self) -> Option<u64> {
todo!("collection of bins can not be TimeSeries")
}
fn ts_min_max(&self) -> Option<(u64, u64)> {
todo!("collection of bins can not be TimeSeries")
}
}
impl<NTY: ScalarOps> TimeBinnableType for MinMaxAvgDim0Bins<NTY> {
type Output = MinMaxAvgDim0Bins<NTY>;
type Aggregator = MinMaxAvgDim0BinsAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
pub struct MinMaxAvgBinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgBinsCollected<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
#[derive(Serialize)]
pub struct MinMaxAvgBinsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
counts: Vec<u64>,
mins: Vec<NTY>,
maxs: Vec<NTY>,
avgs: Vec<f32>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
}
impl<NTY: ScalarOps> ToJsonResult for MinMaxAvgBinsCollectedResult<NTY> {
fn to_json_result(&self) -> Result<Box<dyn crate::streams::ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
Ok(Box::new(k))
}
}
pub struct MinMaxAvgBinsCollector<NTY> {
timed_out: bool,
range_complete: bool,
vals: MinMaxAvgDim0Bins<NTY>,
}
impl<NTY> MinMaxAvgBinsCollector<NTY> {
pub fn new() -> Self {
Self {
timed_out: false,
range_complete: false,
vals: MinMaxAvgDim0Bins::<NTY>::empty(),
}
}
}
impl<NTY> WithLen for MinMaxAvgBinsCollector<NTY> {
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY: ScalarOps> CollectorType for MinMaxAvgBinsCollector<NTY> {
type Input = MinMaxAvgDim0Bins<NTY>;
type Output = MinMaxAvgBinsCollectedResult<NTY>;
fn ingest(&mut self, _src: &mut Self::Input) {
err::todo();
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count = self.vals.ts1s.len() as u32;
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
info!("TODO return proper continueAt");
let bin_count_exp = 100 as u32;
let continue_at = if self.vals.ts1s.len() < bin_count_exp as usize {
match ts_all.last() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
Some(iso)
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let tst = ts_offs_from_abs(&ts_all);
let counts = mem::replace(&mut self.vals.counts, Vec::new());
let mins = mem::replace(&mut self.vals.mins, Vec::new());
let maxs = mem::replace(&mut self.vals.maxs, Vec::new());
let avgs = mem::replace(&mut self.vals.avgs, Vec::new());
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
counts,
mins,
maxs,
avgs,
finalised_range: self.range_complete,
missing_bins: bin_count_exp - bin_count,
continue_at,
};
Ok(ret)
}
}
impl<NTY: ScalarOps> CollectableType for MinMaxAvgDim0Bins<NTY> {
type Collector = MinMaxAvgBinsCollector<NTY>;
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}
pub struct MinMaxAvgDim0BinsAggregator<NTY> {
range: NanoRange,
count: u64,
min: NTY,
max: NTY,
// Carry over to next bin:
avg: f32,
sumc: u64,
sum: f32,
}
impl<NTY: ScalarOps> MinMaxAvgDim0BinsAggregator<NTY> {
pub fn new(range: NanoRange, _do_time_weight: bool) -> Self {
Self {
range,
count: 0,
min: NTY::zero(),
max: NTY::zero(),
avg: 0.,
sumc: 0,
sum: 0f32,
}
}
}
impl<NTY: ScalarOps> TimeBinnableTypeAggregator for MinMaxAvgDim0BinsAggregator<NTY> {
type Input = MinMaxAvgDim0Bins<NTY>;
type Output = MinMaxAvgDim0Bins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
if item.counts[i1] == 0 {
} else if item.ts2s[i1] <= self.range.beg {
} else if item.ts1s[i1] >= self.range.end {
} else {
if self.count == 0 {
self.min = item.mins[i1].clone();
self.max = item.maxs[i1].clone();
} else {
if self.min > item.mins[i1] {
self.min = item.mins[i1].clone();
}
if self.max < item.maxs[i1] {
self.max = item.maxs[i1].clone();
}
}
self.count += item.counts[i1];
self.sum += item.avgs[i1];
self.sumc += 1;
}
}
}
fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
if self.sumc > 0 {
self.avg = self.sum / self.sumc as f32;
}
let ret = Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min.clone()],
maxs: vec![self.max.clone()],
avgs: vec![self.avg],
};
self.range = range;
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
}
}
impl<NTY: ScalarOps> TimeBinnable for MinMaxAvgDim0Bins<NTY> {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinner> {
let ret = MinMaxAvgDim0BinsTimeBinner::<NTY>::new(edges.into(), do_time_weight);
Box::new(ret)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
pub struct MinMaxAvgDim0BinsTimeBinner<NTY: ScalarOps> {
edges: VecDeque<u64>,
do_time_weight: bool,
agg: Option<MinMaxAvgDim0BinsAggregator<NTY>>,
ready: Option<<MinMaxAvgDim0BinsAggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
}
impl<NTY: ScalarOps> MinMaxAvgDim0BinsTimeBinner<NTY> {
fn new(edges: VecDeque<u64>, do_time_weight: bool) -> Self {
Self {
edges,
do_time_weight,
agg: None,
ready: None,
}
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
if self.edges.len() >= 2 {
let ret = NanoRange {
beg: self.edges[0],
end: self.edges[1],
};
self.edges.pop_front();
Some(ret)
} else {
None
}
}
}
impl<NTY: ScalarOps> TimeBinner for MinMaxAvgDim0BinsTimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
}
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges A");
return;
}
// TODO optimize by remembering at which event array index we have arrived.
// That needs modified interfaces which can take and yield the start and latest index.
loop {
while item.starts_after(NanoRange {
beg: 0,
end: self.edges[1],
}) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges B");
return;
}
}
if item.ends_before(NanoRange {
beg: self.edges[0],
end: u64::MAX,
}) {
return;
} else {
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} edge list exhausted");
return;
} else {
let agg = if let Some(agg) = self.agg.as_mut() {
agg
} else {
self.agg = Some(MinMaxAvgDim0BinsAggregator::new(
// We know here that we have enough edges for another bin.
// and `next_bin_range` will pop the first edge.
self.next_bin_range().unwrap(),
self.do_time_weight,
));
self.agg.as_mut().unwrap()
};
if let Some(item) = item
.as_any()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<MinMaxAvgDim0BinsAggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
{
agg.ingest(item);
} else {
let tyid_item = std::any::Any::type_id(item.as_any());
error!("not correct item type {:?}", tyid_item);
};
if item.ends_after(agg.range().clone()) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {self_name} no more bin in edges C");
return;
}
} else {
break;
}
}
}
}
}
fn bins_ready_count(&self) -> usize {
match &self.ready {
Some(k) => k.len(),
None => 0,
}
}
fn bins_ready(&mut self) -> Option<Box<dyn crate::TimeBinned>> {
match self.ready.take() {
Some(k) => Some(Box::new(k)),
None => None,
}
}
// TODO there is too much common code between implementors:
fn push_in_progress(&mut self, push_empty: bool) {
// TODO expand should be derived from AggKind. Is it still required after all?
let expand = true;
if let Some(agg) = self.agg.as_mut() {
let dummy_range = NanoRange { beg: 4, end: 5 };
let bins = agg.result_reset(dummy_range, expand);
self.agg = None;
assert_eq!(bins.len(), 1);
if push_empty || bins.counts[0] != 0 {
match self.ready.as_mut() {
Some(_ready) => {
err::todo();
//ready.append(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
}
}
}
// TODO there is too much common code between implementors:
fn cycle(&mut self) {
let n = self.bins_ready_count();
self.push_in_progress(true);
if self.bins_ready_count() == n {
if let Some(_range) = self.next_bin_range() {
let bins = MinMaxAvgDim0Bins::<NTY>::empty();
err::todo();
//bins.append_zero(range.beg, range.end);
match self.ready.as_mut() {
Some(_ready) => {
err::todo();
//ready.append(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
if self.bins_ready_count() <= n {
error!("failed to push a zero bin");
}
} else {
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
}
}
}
}
impl<NTY: ScalarOps> TimeBinned for MinMaxAvgDim0Bins<NTY> {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
}
fn edges_slice(&self) -> (&[u64], &[u64]) {
(&self.ts1s[..], &self.ts2s[..])
}
fn counts(&self) -> &[u64] {
&self.counts[..]
}
fn mins(&self) -> Vec<f32> {
self.mins.iter().map(|x| x.clone().as_prim_f32()).collect()
}
fn maxs(&self) -> Vec<f32> {
self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect()
}
fn avgs(&self) -> Vec<f32> {
self.avgs.clone()
}
fn validate(&self) -> Result<(), String> {
use std::fmt::Write;
let mut msg = String::new();
if self.ts1s.len() != self.ts2s.len() {
write!(&mut msg, "ts1s ≠ ts2s\n").unwrap();
}
for (i, ((count, min), max)) in self.counts.iter().zip(&self.mins).zip(&self.maxs).enumerate() {
if min.as_prim_f32() < 1. && *count != 0 {
write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap();
}
}
if msg.is_empty() {
Ok(())
} else {
Err(msg)
}
}
}

660
items_2/src/eventsdim0.rs Normal file
View File

@@ -0,0 +1,660 @@
use crate::binsdim0::MinMaxAvgDim0Bins;
use crate::streams::{CollectableType, CollectorType, ToJsonResult};
use crate::{pulse_offs_from_abs, ts_offs_from_abs};
use crate::{
Empty, Events, ScalarOps, TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner, TimeSeries,
WithLen,
};
use err::Error;
use netpod::log::*;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
// TODO in this module reduce clones.
#[derive(Serialize, Deserialize)]
pub struct EventsDim0<NTY> {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,
pub values: Vec<NTY>,
}
impl<NTY> EventsDim0<NTY> {
#[inline(always)]
pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) {
self.tss.push(ts);
self.pulses.push(pulse);
self.values.push(value);
}
}
impl<NTY> Empty for EventsDim0<NTY> {
fn empty() -> Self {
Self {
tss: vec![],
pulses: vec![],
values: vec![],
}
}
}
impl<NTY> fmt::Debug for EventsDim0<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"count {} ts {:?} .. {:?} vals {:?} .. {:?}",
self.tss.len(),
self.tss.first(),
self.tss.last(),
self.values.first(),
self.values.last(),
)
}
}
impl<NTY> WithLen for EventsDim0<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> TimeSeries for EventsDim0<NTY> {
fn ts_min(&self) -> Option<u64> {
self.tss.first().map(Clone::clone)
}
fn ts_max(&self) -> Option<u64> {
self.tss.last().map(Clone::clone)
}
fn ts_min_max(&self) -> Option<(u64, u64)> {
if self.tss.len() == 0 {
None
} else {
Some((self.tss.first().unwrap().clone(), self.tss.last().unwrap().clone()))
}
}
}
impl<NTY: ScalarOps> TimeBinnableType for EventsDim0<NTY> {
type Output = MinMaxAvgDim0Bins<NTY>;
type Aggregator = EventValuesAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
let self_name = std::any::type_name::<Self>();
debug!(
"TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}",
range, x_bin_count, do_time_weight
);
Self::Aggregator::new(range, do_time_weight)
}
}
pub struct EventValuesCollector<NTY> {
vals: EventsDim0<NTY>,
range_complete: bool,
timed_out: bool,
}
impl<NTY> EventValuesCollector<NTY> {
pub fn new() -> Self {
Self {
vals: EventsDim0::empty(),
range_complete: false,
timed_out: false,
}
}
}
impl<NTY> WithLen for EventValuesCollector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
#[derive(Serialize)]
pub struct EventValuesCollectorOutput<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
#[serde(rename = "pulseAnchor")]
pulse_anchor: u64,
#[serde(rename = "pulseOff")]
pulse_off: Vec<u64>,
values: Vec<NTY>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
}
impl<NTY: ScalarOps> ToJsonResult for EventValuesCollectorOutput<NTY> {
fn to_json_result(&self) -> Result<Box<dyn crate::streams::ToJsonBytes>, Error> {
let k = serde_json::to_value(self)?;
Ok(Box::new(k))
}
}
impl<NTY: ScalarOps> CollectorType for EventValuesCollector<NTY> {
type Input = EventsDim0<NTY>;
type Output = EventValuesCollectorOutput<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
// TODO could be optimized by non-contiguous container.
self.vals.tss.append(&mut src.tss);
self.vals.pulses.append(&mut src.pulses);
self.vals.values.append(&mut src.values);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(&mut self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
pulse_anchor,
pulse_off,
values: mem::replace(&mut self.vals.values, Vec::new()),
range_complete: self.range_complete,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<NTY: ScalarOps> CollectableType for EventsDim0<NTY> {
type Collector = EventValuesCollector<NTY>;
fn new_collector() -> Self::Collector {
Self::Collector::new()
}
}
pub struct EventValuesAggregator<NTY> {
range: NanoRange,
count: u64,
min: NTY,
max: NTY,
sumc: u64,
sum: f32,
int_ts: u64,
last_ts: u64,
last_val: Option<NTY>,
do_time_weight: bool,
events_taken_count: u64,
events_ignored_count: u64,
}
impl<NTY> Drop for EventValuesAggregator<NTY> {
fn drop(&mut self) {
// TODO collect as stats for the request context:
trace!(
"taken {} ignored {}",
self.events_taken_count,
self.events_ignored_count
);
}
}
impl<NTY: ScalarOps> EventValuesAggregator<NTY> {
pub fn new(range: NanoRange, do_time_weight: bool) -> Self {
let int_ts = range.beg;
Self {
range,
count: 0,
min: NTY::zero(),
max: NTY::zero(),
sum: 0.,
sumc: 0,
int_ts,
last_ts: 0,
last_val: None,
do_time_weight,
events_taken_count: 0,
events_ignored_count: 0,
}
}
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
fn apply_min_max(&mut self, val: NTY) {
if self.count == 0 {
self.min = val.clone();
self.max = val.clone();
} else {
if self.min > val {
self.min = val.clone();
}
if self.max < val {
self.max = val.clone();
}
}
}
fn apply_event_unweight(&mut self, val: NTY) {
let vf = val.as_prim_f32();
self.apply_min_max(val);
if vf.is_nan() {
} else {
self.sum += vf;
self.sumc += 1;
}
}
fn apply_event_time_weight(&mut self, ts: u64) {
if let Some(v) = &self.last_val {
let vf = v.as_prim_f32();
let v2 = v.clone();
self.apply_min_max(v2);
let w = if self.do_time_weight {
(ts - self.int_ts) as f32 * 1e-9
} else {
1.
};
if vf.is_nan() {
} else {
self.sum += vf * w;
self.sumc += 1;
}
self.int_ts = ts;
} else {
debug!(
"apply_event_time_weight NO VALUE {}",
ts as i64 - self.range.beg as i64
);
}
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
if ts < self.range.beg {
self.events_ignored_count += 1;
} else if ts >= self.range.end {
self.events_ignored_count += 1;
return;
} else {
self.apply_event_unweight(val);
self.count += 1;
self.events_taken_count += 1;
}
}
}
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
if ts < self.int_ts {
if self.last_val.is_none() {
info!(
"ingest_time_weight event before range, only set last ts {} val {:?}",
ts, val
);
}
self.events_ignored_count += 1;
self.last_ts = ts;
self.last_val = Some(val);
} else if ts >= self.range.end {
self.events_ignored_count += 1;
return;
} else {
self.apply_event_time_weight(ts);
if self.last_val.is_none() {
info!(
"call apply_min_max without last val, use current instead {} {:?}",
ts, val
);
self.apply_min_max(val.clone());
}
self.count += 1;
self.last_ts = ts;
self.last_val = Some(val);
self.events_taken_count += 1;
}
}
}
fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim0Bins<NTY> {
let (min, max, avg) = if self.sumc > 0 {
let avg = self.sum / self.sumc as f32;
(self.min.clone(), self.max.clone(), avg)
} else {
let g = match &self.last_val {
Some(x) => x.clone(),
None => NTY::zero(),
};
(g.clone(), g.clone(), g.as_prim_f32())
};
let ret = MinMaxAvgDim0Bins {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
}
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgDim0Bins<NTY> {
// TODO check callsite for correct expand status.
if expand {
debug!("result_reset_time_weight calls apply_event_time_weight");
self.apply_event_time_weight(self.range.end);
} else {
debug!("result_reset_time_weight NO EXPAND");
}
let (min, max, avg) = if self.sumc > 0 {
let avg = self.sum / (self.range.delta() as f32 * 1e-9);
(self.min.clone(), self.max.clone(), avg)
} else {
let g = match &self.last_val {
Some(x) => x.clone(),
None => NTY::zero(),
};
(g.clone(), g.clone(), g.as_prim_f32())
};
let ret = MinMaxAvgDim0Bins {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
};
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.sum = 0f32;
self.sumc = 0;
ret
}
}
impl<NTY: ScalarOps> TimeBinnableTypeAggregator for EventValuesAggregator<NTY> {
type Input = EventsDim0<NTY>;
type Output = MinMaxAvgDim0Bins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
debug!("ingest len {}", item.len());
if self.do_time_weight {
self.ingest_time_weight(item)
} else {
self.ingest_unweight(item)
}
}
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
debug!("Produce for {:?} next {:?}", self.range, range);
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
} else {
self.result_reset_unweight(range, expand)
}
}
}
impl<NTY: ScalarOps> TimeBinnable for EventsDim0<NTY> {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinner> {
let ret = ScalarEventsTimeBinner::<NTY>::new(edges.into(), do_time_weight);
Box::new(ret)
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}
impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
}
fn verify(&self) {
let mut ts_max = 0;
for ts in &self.tss {
let ts = *ts;
if ts < ts_max {
error!("unordered event data ts {} ts_max {}", ts, ts_max);
}
ts_max = ts_max.max(ts);
}
}
fn output_info(&self) {
if false {
info!("output_info len {}", self.tss.len());
if self.tss.len() == 1 {
info!(
" only: ts {} pulse {} value {:?}",
self.tss[0], self.pulses[0], self.values[0]
);
} else if self.tss.len() > 1 {
info!(
" first: ts {} pulse {} value {:?}",
self.tss[0], self.pulses[0], self.values[0]
);
let n = self.tss.len() - 1;
info!(
" last: ts {} pulse {} value {:?}",
self.tss[n], self.pulses[n], self.values[n]
);
}
}
}
fn as_collectable_mut(&mut self) -> &mut dyn crate::streams::Collectable {
self
}
}
pub struct ScalarEventsTimeBinner<NTY: ScalarOps> {
// The first two edges are used the next time that we create an aggregator, or push a zero bin.
edges: VecDeque<u64>,
do_time_weight: bool,
agg: Option<EventValuesAggregator<NTY>>,
ready: Option<<EventValuesAggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
}
impl<NTY: ScalarOps> ScalarEventsTimeBinner<NTY> {
fn new(edges: VecDeque<u64>, do_time_weight: bool) -> Self {
Self {
edges,
do_time_weight,
agg: None,
ready: None,
}
}
fn next_bin_range(&mut self) -> Option<NanoRange> {
if self.edges.len() >= 2 {
let ret = NanoRange {
beg: self.edges[0],
end: self.edges[1],
};
self.edges.pop_front();
Some(ret)
} else {
None
}
}
}
impl<NTY: ScalarOps> TimeBinner for ScalarEventsTimeBinner<NTY> {
fn bins_ready_count(&self) -> usize {
match &self.ready {
Some(k) => k.len(),
None => 0,
}
}
fn bins_ready(&mut self) -> Option<Box<dyn crate::TimeBinned>> {
match self.ready.take() {
Some(k) => Some(Box::new(k)),
None => None,
}
}
fn ingest(&mut self, item: &dyn TimeBinnable) {
const SELF: &str = "ScalarEventsTimeBinner";
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
}
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {SELF} no more bin in edges A");
return;
}
// TODO optimize by remembering at which event array index we have arrived.
// That needs modified interfaces which can take and yield the start and latest index.
loop {
while item.starts_after(NanoRange {
beg: 0,
end: self.edges[1],
}) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {SELF} no more bin in edges B");
return;
}
}
if item.ends_before(NanoRange {
beg: self.edges[0],
end: u64::MAX,
}) {
return;
} else {
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {SELF} edge list exhausted");
return;
} else {
let agg = if let Some(agg) = self.agg.as_mut() {
agg
} else {
self.agg = Some(EventValuesAggregator::new(
// We know here that we have enough edges for another bin.
// and `next_bin_range` will pop the first edge.
self.next_bin_range().unwrap(),
self.do_time_weight,
));
self.agg.as_mut().unwrap()
};
if let Some(item) = item
.as_any()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<EventValuesAggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
{
// TODO collect statistics associated with this request:
agg.ingest(item);
} else {
error!("not correct item type");
};
if item.ends_after(agg.range().clone()) {
self.cycle();
if self.edges.len() < 2 {
warn!("TimeBinnerDyn for {SELF} no more bin in edges C");
return;
}
} else {
break;
}
}
}
}
}
fn push_in_progress(&mut self, push_empty: bool) {
// TODO expand should be derived from AggKind. Is it still required after all?
// TODO here, the expand means that agg will assume that the current value is kept constant during
// the rest of the time range.
let expand = true;
let range_next = if self.agg.is_some() {
if let Some(x) = self.next_bin_range() {
Some(x)
} else {
None
}
} else {
None
};
if let Some(agg) = self.agg.as_mut() {
let bins;
if let Some(range_next) = range_next {
bins = agg.result_reset(range_next, expand);
} else {
let range_next = NanoRange { beg: 4, end: 5 };
bins = agg.result_reset(range_next, expand);
self.agg = None;
}
assert_eq!(bins.len(), 1);
if push_empty || bins.counts[0] != 0 {
match self.ready.as_mut() {
Some(_ready) => {
error!("TODO eventsdim0 time binner append");
err::todo();
//ready.append(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
}
}
}
fn cycle(&mut self) {
let n = self.bins_ready_count();
self.push_in_progress(true);
if self.bins_ready_count() == n {
if let Some(_range) = self.next_bin_range() {
let bins = MinMaxAvgDim0Bins::<NTY>::empty();
error!("TODO eventsdim0 time binner append");
err::todo();
//bins.append_zero(range.beg, range.end);
match self.ready.as_mut() {
Some(_ready) => {
error!("TODO eventsdim0 time binner append");
err::todo();
//ready.append(&mut bins);
}
None => {
self.ready = Some(bins);
}
}
if self.bins_ready_count() <= n {
error!("failed to push a zero bin");
}
} else {
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
}
}
}
}

456
items_2/src/items_2.rs Normal file
View File

@@ -0,0 +1,456 @@
pub mod binsdim0;
pub mod eventsdim0;
pub mod streams;
use chrono::{DateTime, TimeZone, Utc};
use futures_util::Stream;
use netpod::log::error;
use netpod::timeunits::*;
use netpod::{AggKind, NanoRange, ScalarType, Shape};
use serde::{Deserialize, Serialize, Serializer};
use std::any::Any;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::Collectable;
pub fn bool_is_false(x: &bool) -> bool {
*x == false
}
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec<u64>, Vec<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec<u64>) {
let pulse_anchor = pulse.first().map_or(0, |k| *k);
let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect();
(pulse_anchor, pulse_off)
}
#[allow(unused)]
const fn is_nan_int<T>(_x: &T) -> bool {
false
}
#[allow(unused)]
fn is_nan_f32(x: f32) -> bool {
x.is_nan()
}
#[allow(unused)]
fn is_nan_f64(x: f64) -> bool {
x.is_nan()
}
pub trait AsPrimF32 {
fn as_prim_f32(&self) -> f32;
}
macro_rules! impl_as_prim_f32 {
($ty:ident) => {
impl AsPrimF32 for $ty {
fn as_prim_f32(&self) -> f32 {
*self as f32
}
}
};
}
impl_as_prim_f32!(u8);
impl_as_prim_f32!(u16);
impl_as_prim_f32!(u32);
impl_as_prim_f32!(u64);
impl_as_prim_f32!(i8);
impl_as_prim_f32!(i16);
impl_as_prim_f32!(i32);
impl_as_prim_f32!(i64);
impl_as_prim_f32!(f32);
impl_as_prim_f32!(f64);
pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + AsPrimF32 + Serialize + Unpin + Send + 'static {
fn zero() -> Self;
}
macro_rules! impl_num_ops {
($ty:ident, $zero:expr) => {
impl ScalarOps for $ty {
fn zero() -> Self {
$zero
}
}
};
}
impl_num_ops!(u8, 0);
impl_num_ops!(u16, 0);
impl_num_ops!(u32, 0);
impl_num_ops!(u64, 0);
impl_num_ops!(i8, 0);
impl_num_ops!(i16, 0);
impl_num_ops!(i32, 0);
impl_num_ops!(i64, 0);
impl_num_ops!(f32, 0.);
impl_num_ops!(f64, 0.);
#[allow(unused)]
struct Ts(u64);
struct Error {
#[allow(unused)]
kind: ErrorKind,
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self { kind }
}
}
enum ErrorKind {
#[allow(unused)]
MismatchedType,
}
pub trait WithLen {
fn len(&self) -> usize;
}
pub trait TimeSeries {
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn ts_min_max(&self) -> Option<(u64, u64)>;
}
pub enum Fits {
Empty,
Lower,
Greater,
Inside,
PartlyLower,
PartlyGreater,
PartlyLowerAndGreater,
}
// TODO can this be removed?
pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits;
}
impl<T> FitsInside for T
where
T: TimeSeries,
{
fn fits_inside(&self, range: NanoRange) -> Fits {
if let Some((min, max)) = self.ts_min_max() {
if max <= range.beg {
Fits::Lower
} else if min >= range.end {
Fits::Greater
} else if min < range.beg && max > range.end {
Fits::PartlyLowerAndGreater
} else if min < range.beg {
Fits::PartlyLower
} else if max > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
} else {
Fits::Empty
}
}
}
pub trait RangeOverlapInfo {
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
impl<T> RangeOverlapInfo for T
where
T: TimeSeries,
{
fn ends_before(&self, range: NanoRange) -> bool {
if let Some(max) = self.ts_max() {
max <= range.beg
} else {
true
}
}
fn ends_after(&self, range: NanoRange) -> bool {
if let Some(max) = self.ts_max() {
max > range.end
} else {
true
}
}
fn starts_after(&self, range: NanoRange) -> bool {
if let Some(min) = self.ts_min() {
min >= range.end
} else {
true
}
}
}
pub trait EmptyForScalarTypeShape {
fn empty(scalar_type: ScalarType, shape: Shape) -> Self;
}
pub trait EmptyForShape {
fn empty(shape: Shape) -> Self;
}
pub trait Empty {
fn empty() -> Self;
}
pub trait AppendEmptyBin {
fn append_empty_bin(&mut self, ts1: u64, ts2: u64);
}
#[derive(Clone, Debug, Deserialize)]
pub struct IsoDateTime(DateTime<Utc>);
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect()
}
pub trait TimeBinner: Send {
fn bins_ready_count(&self) -> usize;
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>>;
fn ingest(&mut self, item: &dyn TimeBinnable);
/// If there is a bin in progress with non-zero count, push it to the result set.
/// With push_empty == true, a bin in progress is pushed even if it contains no counts.
fn push_in_progress(&mut self, push_empty: bool);
/// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
/// to `push_in_progress` did not change the result count, as long as edges are left.
/// The next call to `Self::bins_ready_count` must return one higher count than before.
fn cycle(&mut self);
}
/// Provides a time-binned representation of the implementing type.
/// In contrast to `TimeBinnableType` this is meant for trait objects.
pub trait TimeBinnable: WithLen + RangeOverlapInfo + Any + Send {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinner>;
fn as_any(&self) -> &dyn Any;
}
/// Container of some form of events, for use as trait object.
pub trait Events: Collectable + TimeBinnable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn verify(&self);
fn output_info(&self);
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
}
/// Data in time-binned form.
pub trait TimeBinned: TimeBinnable {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn edges_slice(&self) -> (&[u64], &[u64]);
fn counts(&self) -> &[u64];
fn mins(&self) -> Vec<f32>;
fn maxs(&self) -> Vec<f32>;
fn avgs(&self) -> Vec<f32>;
fn validate(&self) -> Result<(), String>;
}
pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo {
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator;
}
pub trait TimeBinnableTypeAggregator: Send {
type Input: TimeBinnableType;
type Output: TimeBinnableType;
fn range(&self) -> &NanoRange;
fn ingest(&mut self, item: &Self::Input);
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output;
}
pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box<dyn TimeBinnable> {
match shape {
Shape::Scalar => match agg_kind {
AggKind::TimeWeightedScalar => {
use ScalarType::*;
type K<T> = eventsdim0::EventsDim0<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_events_dyn");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn");
err::todoval()
}
},
Shape::Wave(..) => {
error!("TODO empty_events_dyn");
err::todoval()
}
Shape::Image(..) => {
error!("TODO empty_events_dyn");
err::todoval()
}
}
}
pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box<dyn TimeBinnable> {
match shape {
Shape::Scalar => match agg_kind {
AggKind::TimeWeightedScalar => {
use ScalarType::*;
type K<T> = binsdim0::MinMaxAvgDim0Bins<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_binned_dyn");
err::todoval()
}
}
}
_ => {
error!("TODO empty_binned_dyn");
err::todoval()
}
},
Shape::Wave(_n) => match agg_kind {
AggKind::DimXBins1 => {
use ScalarType::*;
type K<T> = binsdim0::MinMaxAvgDim0Bins<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_binned_dyn");
err::todoval()
}
}
}
_ => {
error!("TODO empty_binned_dyn");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_binned_dyn");
err::todoval()
}
}
}
pub enum ConnStatus {}
pub struct ConnStatusEvent {
pub ts: u64,
pub status: ConnStatus,
}
trait MergableEvents: Any {
fn len(&self) -> usize;
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn take_from(&mut self, src: &mut dyn MergableEvents, ts_end: u64) -> Result<(), Error>;
}
pub enum ChannelEvents {
Events(Box<dyn Events>),
Status(ConnStatusEvent),
RangeComplete,
}
impl MergableEvents for ChannelEvents {
fn len(&self) -> usize {
error!("TODO MergableEvents");
todo!()
}
fn ts_min(&self) -> Option<u64> {
error!("TODO MergableEvents");
todo!()
}
fn ts_max(&self) -> Option<u64> {
error!("TODO MergableEvents");
todo!()
}
fn take_from(&mut self, _src: &mut dyn MergableEvents, _ts_end: u64) -> Result<(), Error> {
error!("TODO MergableEvents");
todo!()
}
}
struct ChannelEventsMerger {
_inp_1: Pin<Box<dyn Stream<Item = Result<Box<dyn MergableEvents>, Error>>>>,
_inp_2: Pin<Box<dyn Stream<Item = Result<Box<dyn MergableEvents>, Error>>>>,
}
impl Stream for ChannelEventsMerger {
type Item = Result<ChannelEvents, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
//use Poll::*;
error!("TODO ChannelEventsMerger");
err::todoval()
}
}
// TODO do this with some blanket impl:
impl Collectable for Box<dyn Collectable> {
fn new_collector(&self) -> Box<dyn streams::Collector> {
Collectable::new_collector(self.as_ref())
}
fn as_any_mut(&mut self) -> &mut dyn Any {
Collectable::as_any_mut(self.as_mut())
}
}

77
items_2/src/streams.rs Normal file
View File

@@ -0,0 +1,77 @@
use crate::WithLen;
use err::Error;
use serde::Serialize;
use std::any::Any;
pub trait CollectorType: Send + Unpin + WithLen {
type Input: Collectable;
type Output: ToJsonResult + Serialize;
fn ingest(&mut self, src: &mut Self::Input);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Self::Output, Error>;
}
pub trait Collector: Send + Unpin + WithLen {
fn ingest(&mut self, src: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, Error>;
}
pub trait CollectableType {
type Collector: CollectorType<Input = Self>;
fn new_collector() -> Self::Collector;
}
pub trait Collectable: Any {
fn new_collector(&self) -> Box<dyn Collector>;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
impl<T: CollectorType + 'static> Collector for T {
fn ingest(&mut self, src: &mut dyn Collectable) {
let src: &mut <T as CollectorType>::Input = src.as_any_mut().downcast_mut().expect("can not downcast");
T::ingest(self, src)
}
fn set_range_complete(&mut self) {
T::set_range_complete(self)
}
fn set_timed_out(&mut self) {
T::set_timed_out(self)
}
fn result(&mut self) -> Result<Box<dyn ToJsonResult>, Error> {
let ret = T::result(self)?;
Ok(Box::new(ret) as _)
}
}
impl<T: CollectableType + 'static> Collectable for T {
fn new_collector(&self) -> Box<dyn Collector> {
Box::new(T::new_collector()) as _
}
fn as_any_mut(&mut self) -> &mut dyn Any {
// TODO interesting: why exactly does returning `&mut self` not work here?
self
}
}
pub trait ToJsonBytes {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error>;
}
pub trait ToJsonResult {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error>;
}
impl ToJsonBytes for serde_json::Value {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
}
}

View File

@@ -94,6 +94,182 @@ impl RawEventsQuery {
}
}
// TODO move this query type out of this `binned` mod
#[derive(Clone, Debug)]
pub struct PlainEventsQuery {
channel: Channel,
range: NanoRange,
disk_io_buffer_size: usize,
report_error: bool,
timeout: Duration,
events_max: Option<u64>,
do_log: bool,
do_test_main_error: bool,
do_test_stream_error: bool,
}
impl PlainEventsQuery {
pub fn new(
channel: Channel,
range: NanoRange,
disk_io_buffer_size: usize,
events_max: Option<u64>,
do_log: bool,
) -> Self {
Self {
channel,
range,
disk_io_buffer_size,
report_error: false,
timeout: Duration::from_millis(10000),
events_max,
do_log,
do_test_main_error: false,
do_test_stream_error: false,
}
}
pub fn channel(&self) -> &Channel {
&self.channel
}
pub fn range(&self) -> &NanoRange {
&self.range
}
pub fn report_error(&self) -> bool {
self.report_error
}
pub fn disk_io_buffer_size(&self) -> usize {
self.disk_io_buffer_size
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn events_max(&self) -> Option<u64> {
self.events_max
}
pub fn do_log(&self) -> bool {
self.do_log
}
pub fn do_test_main_error(&self) -> bool {
self.do_test_main_error
}
pub fn do_test_stream_error(&self) -> bool {
self.do_test_stream_error
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
}
pub fn set_timeout(&mut self, k: Duration) {
self.timeout = k;
}
pub fn set_do_test_main_error(&mut self, k: bool) {
self.do_test_main_error = k;
}
pub fn set_do_test_stream_error(&mut self, k: bool) {
self.do_test_stream_error = k;
}
}
impl HasBackend for PlainEventsQuery {
fn backend(&self) -> &str {
&self.channel.backend
}
}
impl HasTimeout for PlainEventsQuery {
fn timeout(&self) -> Duration {
self.timeout.clone()
}
}
impl FromUrl for PlainEventsQuery {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, Error> {
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
let ret = Self {
channel: Channel::from_pairs(&pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
disk_io_buffer_size: pairs
.get("diskIoBufferSize")
.map_or("4096", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?,
report_error: pairs
.get("reportError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?,
timeout: pairs
.get("timeout")
.map_or("10000", |k| k)
.parse::<u64>()
.map(|k| Duration::from_millis(k))
.map_err(|e| Error::with_public_msg(format!("can not parse timeout {:?}", e)))?,
events_max: pairs
.get("eventsMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
do_log: pairs
.get("doLog")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?,
do_test_main_error: pairs
.get("doTestMainError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?,
do_test_stream_error: pairs
.get("doTestStreamError")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?,
};
Ok(ret)
}
}
impl AppendToUrl for PlainEventsQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
self.channel.append_to_url(url);
let mut g = url.query_pairs_mut();
g.append_pair(
"begDate",
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
);
g.append_pair(
"endDate",
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
);
g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size));
g.append_pair("timeout", &format!("{}", self.timeout.as_millis()));
if let Some(x) = self.events_max.as_ref() {
g.append_pair("eventsMax", &format!("{}", x));
}
g.append_pair("doLog", &format!("{}", self.do_log));
}
}
#[derive(Clone, Debug)]
pub struct BinnedQuery {
channel: Channel,

View File

@@ -23,8 +23,8 @@ futures-core = "0.3.14"
futures-util = "0.3.14"
tracing = "0.1.25"
hex = "0.4.3"
scylla = "0.4.4"
tokio-postgres = "0.7.6"
scylla = "0.5"
tokio-postgres = "0.7.7"
err = { path = "../err" }
netpod = { path = "../netpod" }
disk = { path = "../disk" }

27
scyllaconn/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "scyllaconn"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/scyllaconn.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"
erased-serde = "0.3"
tokio = { version = "1.20.1", default-features = false, features = ["time", "sync"] }
byteorder = "1.4.3"
bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.3.2"
futures-util = "0.3.24"
async-channel = "1.7.1"
scylla = "0.5"
tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
err = { path = "../err" }
netpod = { path = "../netpod" }
items_2 = { path = "../items_2" }

468
scyllaconn/src/bincache.rs Normal file
View File

@@ -0,0 +1,468 @@
use crate::errconv::ErrConv;
use crate::events::EventsStreamScylla;
use err::Error;
use futures_util::{Future, Stream, StreamExt};
use items_2::binsdim0::MinMaxAvgDim0Bins;
use items_2::{empty_binned_dyn, empty_events_dyn, ChannelEvents, TimeBinned};
use netpod::log::*;
use netpod::query::{CacheUsage, PlainEventsQuery, RawEventsQuery};
use netpod::timeunits::*;
use netpod::{AggKind, ChannelTyped, ScalarType, Shape};
use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange};
use scylla::Session as ScySession;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
pub async fn read_cached_scylla(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
_agg_kind: AggKind,
scy: &ScySession,
) -> Result<Option<Box<dyn TimeBinned>>, Error> {
let vals = (
series as i64,
(coord.bin_t_len() / SEC) as i32,
(coord.patch_t_len() / SEC) as i32,
coord.ix() as i64,
);
let res = scy
.query_iter(
"select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?",
vals,
)
.await;
let mut res = res.err_conv().map_err(|e| {
error!("can not read from cache");
e
})?;
while let Some(item) = res.next().await {
let row = item.err_conv()?;
let edges = coord.edges();
let (counts, avgs, mins, maxs): (Vec<i64>, Vec<f32>, Vec<f32>, Vec<f32>) = row.into_typed().err_conv()?;
let mut counts_mismatch = false;
if edges.len() != counts.len() + 1 {
counts_mismatch = true;
}
if counts.len() != avgs.len() {
counts_mismatch = true;
}
let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec();
let ts2s = edges[1.min(edges.len())..].to_vec();
if ts1s.len() != ts2s.len() {
error!("ts1s vs ts2s mismatch");
counts_mismatch = true;
}
if ts1s.len() != counts.len() {
counts_mismatch = true;
}
let avgs = avgs.into_iter().map(|x| x).collect::<Vec<_>>();
let mins = mins.into_iter().map(|x| x as _).collect::<Vec<_>>();
let maxs = maxs.into_iter().map(|x| x as _).collect::<Vec<_>>();
if counts_mismatch {
error!(
"mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}",
edges.len(),
ts1s.len(),
ts2s.len(),
counts.len(),
avgs.len(),
mins.len(),
maxs.len(),
);
}
let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect();
// TODO construct a dyn TimeBinned using the scalar type and shape information.
// TODO place the values with little copying into the TimeBinned.
use ScalarType::*;
use Shape::*;
match &chn.shape {
Scalar => match &chn.scalar_type {
F64 => {
let ret = MinMaxAvgDim0Bins::<f64> {
ts1s,
ts2s,
counts,
avgs,
mins,
maxs,
};
return Ok(Some(Box::new(ret)));
}
_ => {
error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape);
err::todoval()
}
},
_ => {
error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape);
err::todoval()
}
}
}
Ok(None)
}
#[allow(unused)]
struct WriteFut<'a> {
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
}
impl<'a> WriteFut<'a> {
#[allow(unused)]
fn new(
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
) -> Self {
Self { chn, coord, data, scy }
}
}
impl<'a> Future for WriteFut<'a> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let _ = cx;
Poll::Ready(Ok(()))
}
}
pub fn write_cached_scylla<'a>(
series: u64,
_chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
//data: &'a dyn TimeBinned,
data: Box<dyn TimeBinned>,
//scy: &'a ScySession,
_scy: Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
//let _chn = unsafe { &*(chn as *const ChannelTyped) };
//let data_ptr = data as *const dyn TimeBinned as usize;
//let scy_ptr = scy as *const ScySession as usize;
let fut = async move {
//let data = unsafe { &*(data_ptr as *const dyn TimeBinned) };
//let scy = unsafe { &*(scy_ptr as *const ScySession) };
let bin_len_sec = (coord.bin_t_len() / SEC) as i32;
let patch_len_sec = (coord.patch_t_len() / SEC) as i32;
let offset = coord.ix();
warn!(
"write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}",
data.counts().len(),
series,
bin_len_sec,
patch_len_sec,
offset,
);
let _data2 = data.counts().iter().map(|x| *x as i64).collect::<Vec<_>>();
/*
let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?;
scy.execute(
&stmt,
(
series as i64,
bin_len_sec,
patch_len_sec,
offset as i64,
data2,
data.avgs(),
data.mins(),
data.maxs(),
),
)
.await
.err_conv()
.map_err(|e| {
error!("can not write to cache");
e
})?;
*/
Ok(())
};
Box::pin(fut)
}
pub async fn fetch_uncached_data(
series: u64,
chn: ChannelTyped,
coord: PreBinnedPatchCoord,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<Option<(Box<dyn TimeBinned>, bool)>, Error> {
info!("fetch_uncached_data {coord:?}");
// Try to find a higher resolution pre-binned grid which covers the requested patch.
let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) {
Ok(Some(range)) => {
if coord.patch_range() != range.range() {
error!(
"The chosen covering range does not exactly cover the requested patch {:?} vs {:?}",
coord.patch_range(),
range.range()
);
}
fetch_uncached_higher_res_prebinned(
series,
&chn,
coord.clone(),
range,
agg_kind,
cache_usage.clone(),
scy.clone(),
)
.await
}
Ok(None) => fetch_uncached_binned_events(series, &chn, coord.clone(), agg_kind, scy.clone()).await,
Err(e) => Err(e),
}?;
if true || complete {
let edges = coord.edges();
if edges.len() < bin.len() + 1 {
error!(
"attempt to write overfull bin to cache edges {} bin {}",
edges.len(),
bin.len()
);
return Err(Error::with_msg_no_trace(format!(
"attempt to write overfull bin to cache"
)));
} else if edges.len() > bin.len() + 1 {
let missing = edges.len() - bin.len() - 1;
error!("attempt to write incomplete bin to cache missing {missing}");
}
if let CacheUsage::Use | CacheUsage::Recreate = &cache_usage {
// TODO pass data in safe way.
let _data = bin.as_ref();
//let fut = WriteFut::new(&chn, &coord, err::todoval(), &scy);
//fut.await?;
//write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?;
}
}
Ok(Some((bin, complete)))
}
pub fn fetch_uncached_data_box(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<Option<(Box<dyn TimeBinned>, bool)>, Error>> + Send>> {
Box::pin(fetch_uncached_data(
series,
chn.clone(),
coord.clone(),
agg_kind,
cache_usage,
scy,
))
}
pub async fn fetch_uncached_higher_res_prebinned(
series: u64,
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
range: PreBinnedPatchRange,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
let edges = coord.edges();
// TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there.
let do_time_weight = true;
// We must produce some result with correct types even if upstream delivers nothing at all.
let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind);
let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let mut complete = true;
let patch_it = PreBinnedPatchIterator::from_range(range.clone());
for patch_coord in patch_it {
// We request data here for a Coord, meaning that we expect to receive multiple bins.
// The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord.
//let patch_coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix());
let (bin, comp) = pre_binned_value_stream_with_scy(
series,
chn,
&patch_coord,
agg_kind.clone(),
cache_usage.clone(),
scy.clone(),
)
.await?;
if let Err(msg) = bin.validate() {
error!(
"pre-binned intermediate issue {} coord {:?} patch_coord {:?}",
msg, coord, patch_coord
);
}
complete = complete && comp;
time_binner.ingest(bin.as_time_binnable_dyn());
}
// Fixed limit to defend against a malformed implementation:
let mut i = 0;
while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize {
let n1 = time_binner.bins_ready_count();
if false {
trace!(
"pre-binned extra cycle {} {} {}",
i,
time_binner.bins_ready_count(),
coord.bin_count()
);
}
time_binner.cycle();
i += 1;
if time_binner.bins_ready_count() <= n1 {
warn!("pre-binned cycle did not add another bin, break");
break;
}
}
if time_binner.bins_ready_count() < coord.bin_count() as usize {
return Err(Error::with_msg_no_trace(format!(
"pre-binned unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}",
time_binner.bins_ready_count(),
coord.bin_count(),
edges.len(),
)));
}
let ready = time_binner
.bins_ready()
.ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?;
if let Err(msg) = ready.validate() {
error!("pre-binned final issue {} coord {:?}", msg, coord);
}
Ok((ready, complete))
}
pub async fn fetch_uncached_binned_events(
series: u64,
chn: &ChannelTyped,
coord: PreBinnedPatchCoord,
agg_kind: AggKind,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
let edges = coord.edges();
// TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there.
let do_time_weight = true;
// We must produce some result with correct types even if upstream delivers nothing at all.
let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind);
let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight);
let deadline = Instant::now();
let deadline = deadline
.checked_add(Duration::from_millis(6000))
.ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?;
let _evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind);
let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range(), 4096, None, true);
let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false);
let mut complete = false;
loop {
let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await;
let item = match item {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_) => {
error!("fetch_uncached_binned_events timeout");
return Err(Error::with_msg_no_trace(format!(
"TODO handle fetch_uncached_binned_events timeout"
)));
}
};
match item {
Ok(ChannelEvents::Events(item)) => {
time_binner.ingest(item.as_time_binnable_dyn());
// TODO could also ask the binner here whether we are "complete" to stop sending useless data.
}
Ok(ChannelEvents::Status(_)) => {
// TODO flag, should not happen.
return Err(Error::with_msg_no_trace(format!(
"unexpected read of channel status events"
)));
}
Ok(ChannelEvents::RangeComplete) => {
complete = true;
}
Err(e) => return Err(e),
}
}
// Fixed limit to defend against a malformed implementation:
let mut i = 0;
while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize {
let n1 = time_binner.bins_ready_count();
if false {
trace!(
"events extra cycle {} {} {}",
i,
time_binner.bins_ready_count(),
coord.bin_count()
);
}
time_binner.cycle();
i += 1;
if time_binner.bins_ready_count() <= n1 {
warn!("events cycle did not add another bin, break");
break;
}
}
if time_binner.bins_ready_count() < coord.bin_count() as usize {
return Err(Error::with_msg_no_trace(format!(
"events unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}",
time_binner.bins_ready_count(),
coord.bin_count(),
edges.len(),
)));
}
let ready = time_binner
.bins_ready()
.ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?;
if let Err(msg) = ready.validate() {
error!("time binned invalid {} coord {:?}", msg, coord);
}
Ok((ready, complete))
}
pub async fn pre_binned_value_stream_with_scy(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<(Box<dyn TimeBinned>, bool), Error> {
trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}");
if let (Some(item), CacheUsage::Use) = (
read_cached_scylla(series, chn, coord, agg_kind.clone(), &scy).await?,
&cache_usage,
) {
info!("+++++++++++++ GOOD READ");
Ok((item, true))
} else {
if let CacheUsage::Use = &cache_usage {
warn!("--+--+--+--+--+--+ NOT YET CACHED");
}
let res = fetch_uncached_data_box(series, chn, coord, agg_kind, cache_usage, scy).await?;
let (bin, complete) =
res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?;
Ok((bin, complete))
}
}
pub async fn pre_binned_value_stream(
series: u64,
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
agg_kind: AggKind,
cache_usage: CacheUsage,
scy: Arc<ScySession>,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn TimeBinned>, Error>> + Send>>, Error> {
trace!("pre_binned_value_stream series {series} {chn:?} {coord:?}");
let res = pre_binned_value_stream_with_scy(series, chn, coord, agg_kind, cache_usage, scy).await?;
error!("TODO pre_binned_value_stream");
err::todo();
Ok(Box::pin(futures_util::stream::iter([Ok(res.0)])))
}

51
scyllaconn/src/errconv.rs Normal file
View File

@@ -0,0 +1,51 @@
use err::Error;
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg(e.to_string())),
}
}
}
impl<T, A> ErrConv<T> for Result<T, async_channel::SendError<A>> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg(e.to_string())),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}

606
scyllaconn/src/events.rs Normal file
View File

@@ -0,0 +1,606 @@
use crate::errconv::ErrConv;
use err::Error;
use futures_util::{Future, FutureExt, Stream, StreamExt};
use items_2::eventsdim0::EventsDim0;
use items_2::{ChannelEvents, Empty, Events};
use netpod::log::*;
use netpod::query::{ChannelStateEvents, PlainEventsQuery};
use netpod::timeunits::*;
use netpod::{Channel, Database, NanoRange, ScalarType, Shape};
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_postgres::Client as PgClient;
macro_rules! read_values {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone());
let fut = fut.map(|x| {
match x {
Ok(k) => {
// TODO why static needed?
//let b = Box::new(k) as Box<dyn Events + 'static>;
let b = Box::new(k) as Box<dyn Events>;
Ok(b)
}
Err(e) => Err(e),
}
});
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>;
fut
}};
}
struct ReadValues {
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msps: VecDeque<u64>,
fwd: bool,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
scy: Arc<ScySession>,
}
impl ReadValues {
fn new(
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msps: VecDeque<u64>,
fwd: bool,
scy: Arc<ScySession>,
) -> Self {
let mut ret = Self {
series,
scalar_type,
shape,
range,
ts_msps,
fwd,
fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(
"future not initialized",
)))),
scy,
};
ret.next();
ret
}
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msps.pop_front() {
self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 1);
true
} else {
false
}
}
fn make_fut(
&mut self,
ts_msp: u64,
_has_more_msp: bool,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let fut = match &self.shape {
Shape::Scalar => match &self.scalar_type {
ScalarType::I8 => {
read_values!(read_next_values_scalar_i8, self, ts_msp)
}
ScalarType::I16 => {
read_values!(read_next_values_scalar_i16, self, ts_msp)
}
ScalarType::I32 => {
read_values!(read_next_values_scalar_i32, self, ts_msp)
}
ScalarType::F32 => {
read_values!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
read_values!(read_next_values_scalar_f64, self, ts_msp)
}
_ => {
error!("TODO ReadValues add more types");
err::todoval()
}
},
Shape::Wave(_) => match &self.scalar_type {
ScalarType::U16 => {
read_values!(read_next_values_array_u16, self, ts_msp)
}
_ => {
error!("TODO ReadValues add more types");
err::todoval()
}
},
_ => {
error!("TODO ReadValues add more types");
err::todoval()
}
};
fut
}
}
enum FrState {
New,
FindMsp(Pin<Box<dyn Future<Output = Result<VecDeque<u64>, Error>> + Send>>),
ReadBack1(ReadValues),
ReadBack2(ReadValues),
ReadValues(ReadValues),
Done,
}
pub struct EventsStreamScylla {
state: FrState,
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msps: VecDeque<u64>,
scy: Arc<ScySession>,
do_test_stream_error: bool,
}
impl EventsStreamScylla {
pub fn new(
series: u64,
evq: &PlainEventsQuery,
scalar_type: ScalarType,
shape: Shape,
scy: Arc<ScySession>,
do_test_stream_error: bool,
) -> Self {
Self {
state: FrState::New,
series,
scalar_type,
shape,
range: evq.range().clone(),
ts_msps: VecDeque::new(),
scy,
do_test_stream_error,
}
}
fn ts_msps_found(&mut self, ts_msps: VecDeque<u64>) {
info!("found ts_msps {ts_msps:?}");
self.ts_msps = ts_msps;
// Find the largest MSP which can potentially contain some event before the range.
let befores: Vec<_> = self
.ts_msps
.iter()
.map(|x| *x)
.filter(|x| *x < self.range.beg)
.collect();
if befores.len() >= 1 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
[befores[befores.len() - 1]].into(),
false,
self.scy.clone(),
);
self.state = FrState::ReadBack1(st);
} else if self.ts_msps.len() >= 1 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
self.ts_msps.clone(),
true,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
} else {
self.state = FrState::Done;
}
}
fn back_1_done(&mut self, item: Box<dyn Events>) -> Option<Box<dyn Events>> {
info!("back_1_done len {}", item.len());
if item.len() == 0 {
// Find the 2nd largest MSP which can potentially contain some event before the range.
let befores: Vec<_> = self
.ts_msps
.iter()
.map(|x| *x)
.filter(|x| *x < self.range.beg)
.collect();
if befores.len() >= 2 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
[befores[befores.len() - 2]].into(),
false,
self.scy.clone(),
);
self.state = FrState::ReadBack2(st);
None
} else if self.ts_msps.len() >= 1 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
self.ts_msps.clone(),
true,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
None
} else {
self.state = FrState::Done;
None
}
} else {
if self.ts_msps.len() > 0 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
self.ts_msps.clone(),
true,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
Some(item)
} else {
self.state = FrState::Done;
Some(item)
}
}
}
fn back_2_done(&mut self, item: Box<dyn Events>) -> Option<Box<dyn Events>> {
info!("back_2_done len {}", item.len());
if self.ts_msps.len() >= 1 {
let st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
self.ts_msps.clone(),
true,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
} else {
self.state = FrState::Done;
}
if item.len() > 0 {
Some(item)
} else {
None
}
}
}
impl Stream for EventsStreamScylla {
type Item = Result<ChannelEvents, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.do_test_stream_error {
let e = Error::with_msg(format!("Test PRIVATE STREAM error."))
.add_public_msg(format!("Test PUBLIC STREAM error."));
return Ready(Some(Err(e)));
}
loop {
break match self.state {
FrState::New => {
let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
}
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(ts_msps)) => {
self.ts_msps_found(ts_msps);
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
if let Some(item) = self.back_1_done(item) {
item.verify();
item.output_info();
Ready(Some(Ok(ChannelEvents::Events(item))))
} else {
continue;
}
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
if let Some(item) = self.back_2_done(item) {
item.verify();
item.output_info();
Ready(Some(Ok(ChannelEvents::Events(item))))
} else {
continue;
}
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
info!("read values");
item.verify();
item.output_info();
if !st.next() {
info!("ReadValues exhausted");
self.state = FrState::Done;
}
Ready(Some(Ok(ChannelEvents::Events(item))))
}
Ready(Err(e)) => Ready(Some(Err(e))),
Pending => Pending,
},
FrState::Done => Ready(None),
};
}
}
}
async fn find_series(channel: &Channel, pgclient: Arc<PgClient>) -> Result<(u64, ScalarType, Shape), Error> {
info!("find_series channel {:?}", channel);
let rows = if let Some(series) = channel.series() {
let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1";
pgclient.query(q, &[&(series as i64)]).await.err_conv()?
} else {
let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2";
pgclient
.query(q, &[&channel.backend(), &channel.name()])
.await
.err_conv()?
};
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(format!(
"No series found for {channel:?}"
)));
}
if rows.len() > 1 {
error!("Multiple series found for {channel:?}");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?;
let series = row.get::<_, i64>(0) as u64;
let _facility: String = row.get(1);
let _channel: String = row.get(2);
let a: i32 = row.get(3);
let scalar_type = ScalarType::from_scylla_i32(a)?;
let a: Vec<i32> = row.get(4);
let shape = Shape::from_scylla_shape_dims(&a)?;
Ok((series, scalar_type, shape))
}
async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc<ScySession>) -> Result<VecDeque<u64>, Error> {
trace!("find_ts_msp series {} {:?}", series, range);
// TODO use prepared statements
let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2";
let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?;
let mut before = VecDeque::new();
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
before.push_front(row.0 as u64);
}
let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut ret = VecDeque::new();
for h in before {
ret.push_back(h);
}
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret.push_back(row.0 as u64);
}
trace!("found in total {} rows", ret.len());
Ok(ret)
}
macro_rules! read_next_scalar_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
fwd: bool,
scy: Arc<ScySession>,
) -> Result<EventsDim0<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
if ts_msp >= range.end {
warn!("given ts_msp {} >= range.end {}", ts_msp, range.end);
}
if range.end > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let res = if fwd {
let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 };
let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 };
trace!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp,
ts_lsp_min,
ts_lsp_max,
stringify!($fname)
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
);
scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64))
.await
.err_conv()?
} else {
let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 };
info!(
"BCK ts_msp {} ts_lsp_max {} range beg {} end {} {}",
ts_msp,
ts_lsp_max,
range.beg,
range.end,
stringify!($fname)
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
);
scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?
};
let mut ret = EventsDim0::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
ret.push(ts, pulse, value);
}
trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
macro_rules! read_next_array_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
_range: NanoRange,
_fwd: bool,
scy: Arc<ScySession>,
) -> Result<EventsDim0<$st>, Error> {
// TODO change return type: so far EventsDim1 does not exist.
error!("TODO read_next_array_values");
err::todo();
if true {
return Err(Error::with_msg_no_trace("redo based on scalar case"));
}
type ST = $st;
type _SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ?"
);
let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?;
let ret = EventsDim0::<ST>::empty();
/*
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
ret.push(ts, pulse, value);
}
*/
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8");
read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16");
read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32");
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16");
pub async fn make_scylla_stream(
evq: &PlainEventsQuery,
scy: Arc<ScySession>,
pgclient: &Arc<PgClient>,
do_test_stream_error: bool,
) -> Result<Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>> + Send>>, Error> {
// Need to know details about the series in order to fetch data:
// TODO should query already contain ScalarType and Shape?
let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? };
let res = Box::pin(EventsStreamScylla::new(
series,
evq,
scalar_type,
shape,
scy,
do_test_stream_error,
)) as _;
Ok(res)
}
pub async fn channel_state_events(
evq: &ChannelStateEvents,
scy: Arc<ScySession>,
_dbconf: Database,
) -> Result<Vec<(u64, u32)>, Error> {
let mut ret = Vec::new();
let div = DAY;
let mut ts_msp = evq.range().beg / div * div;
loop {
let series = (evq
.channel()
.series()
.ok_or(Error::with_msg_no_trace(format!("series id not given"))))?;
let params = (series as i64, ts_msp as i64);
let mut res = scy
.query_iter(
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ?",
params,
)
.await
.err_conv()?;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?;
let ts = ts_msp + ts_lsp as u64;
let kind = kind as u32;
if ts >= evq.range().beg && ts < evq.range().end {
ret.push((ts, kind));
}
}
ts_msp += DAY;
if ts_msp >= evq.range().end {
break;
}
}
Ok(ret)
}

View File

@@ -0,0 +1,20 @@
pub mod bincache;
pub mod errconv;
pub mod events;
use err::Error;
use errconv::ErrConv;
use netpod::ScyllaConfig;
use scylla::Session as ScySession;
use std::sync::Arc;
pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession>, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.use_keyspace(&scyconf.keyspace, true)
.build()
.await
.err_conv()?;
let ret = Arc::new(scy);
Ok(ret)
}