From c54eaa6fcba1396b561e693db892a2992062d4f2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 28 Feb 2023 09:47:27 +0100 Subject: [PATCH] Refactor --- commonio/Cargo.toml | 2 +- commonio/src/commonio.rs | 6 +- daqbufp2/src/client.rs | 2 +- daqbufp2/src/test/binnedbinary.rs | 2 +- daqbufp2/src/test/events.rs | 2 +- disk/src/decode.rs | 8 +- disk/src/eventblobs.rs | 20 +- disk/src/merge.rs | 25 +- disk/src/merge/mergedblobsfromremotes.rs | 4 +- disk/src/raw/conn.rs | 8 +- disk/src/streamlog.rs | 2 +- httpret/src/api1.rs | 62 +- httpret/src/httpret.rs | 5 +- items/Cargo.toml | 7 +- items/src/items.rs | 701 ----------------------- items/src/lib.rs | 163 ++++++ items/src/streams.rs | 12 +- items_0/Cargo.toml | 2 + items_0/src/framable.rs | 19 + items_0/src/isodate.rs | 24 + items_0/src/items_0.rs | 5 +- items_0/src/streamitem.rs | 179 ++++++ items_2/Cargo.toml | 2 + items_2/src/channelevents.rs | 7 +- items_2/src/databuffereventblobs.rs | 24 - {items => items_2}/src/eventfull.rs | 26 +- items_2/src/eventsdim0.rs | 74 +-- items_2/src/framable.rs | 150 +++++ {items => items_2}/src/frame.rs | 27 +- {items => items_2}/src/inmem.rs | 0 items_2/src/items_2.rs | 11 +- items_2/src/merger.rs | 10 +- items_2/src/test.rs | 8 +- nodenet/src/conn.rs | 20 +- nodenet/src/conn/test.rs | 41 +- streams/src/collect.rs | 18 +- streams/src/eventchunker.rs | 23 +- streams/src/frames/eventsfromframes.rs | 10 +- streams/src/frames/inmem.rs | 19 +- streams/src/plaineventsjson.rs | 6 +- streams/src/rangefilter.rs | 22 +- streams/src/rangefilter2.rs | 8 +- streams/src/tcprawclient.rs | 23 +- streams/src/test.rs | 6 +- streams/src/test/collect.rs | 5 +- streams/src/test/timebin.rs | 17 +- streams/src/timebin.rs | 17 +- streams/src/timebinnedjson.rs | 15 +- 48 files changed, 909 insertions(+), 940 deletions(-) delete mode 100644 items/src/items.rs create mode 100644 items/src/lib.rs create mode 100644 items_0/src/framable.rs create mode 100644 items_0/src/isodate.rs create mode 100644 items_0/src/streamitem.rs delete mode 100644 items_2/src/databuffereventblobs.rs rename {items => items_2}/src/eventfull.rs (92%) create mode 100644 items_2/src/framable.rs rename {items => items_2}/src/frame.rs (93%) rename {items => items_2}/src/inmem.rs (100%) diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml index 6a08967..57208d9 100644 --- a/commonio/Cargo.toml +++ b/commonio/Cargo.toml @@ -23,5 +23,5 @@ crc32fast = "1.2" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } -items = { path = "../items" } +items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs index 836e192..ec79e08 100644 --- a/commonio/src/commonio.rs +++ b/commonio/src/commonio.rs @@ -4,9 +4,9 @@ use async_channel::Sender; use err::ErrStr; use err::Error; use futures_util::StreamExt; -use items::Sitemty; -use items::StatsItem; -use items::StreamItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::DiskStats; use netpod::OpenStats; diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 81cf981..3b1341c 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -7,7 +7,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use httpclient::HttpBodyAsAsyncRead; use hyper::Body; -use items::StreamItem; +use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 467264c..13bcc2a 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -9,7 +9,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use httpclient::HttpBodyAsAsyncRead; use hyper::Body; -use items::StreamItem; +use items_0::streamitem::StreamItem; use items_0::subfr::SubFrId; use netpod::log::*; use netpod::query::BinnedQuery; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index d5a0b7f..6ed0ef9 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -9,7 +9,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use httpclient::HttpBodyAsAsyncRead; use hyper::Body; -use items::StreamItem; +use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::AggKind; diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 2f09e79..5fb7c6f 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -2,12 +2,12 @@ use crate::eventblobs::EventChunkerMultifile; use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; use items_0::scalar_ops::ScalarOps; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::Events; +use items_2::eventfull::EventFull; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; #[allow(unused)] diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index c847379..a5d2e92 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -5,11 +5,11 @@ use crate::merge::MergedStream; use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; -use items::LogItem; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::eventfull::EventFull; use netpod::log::*; use netpod::timeunits::SEC; use netpod::ChannelConfig; @@ -262,11 +262,15 @@ mod test { use crate::eventblobs::EventChunkerMultifile; use err::Error; use futures_util::StreamExt; - use items::{RangeCompletableItem, StreamItem}; + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::StreamItem; use netpod::log::*; - use netpod::timeunits::{DAY, MS}; + use netpod::timeunits::DAY; + use netpod::timeunits::MS; + use netpod::ByteSize; + use netpod::ChannelConfig; use netpod::DiskIoTune; - use netpod::{ByteSize, ChannelConfig, Nanos}; + use netpod::Nanos; use streams::eventchunker::EventChunkerConf; use streams::rangefilter::RangeFilter; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index cf16f63..e8e83ca 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -5,13 +5,13 @@ use futures_util::Stream; use futures_util::StreamExt; use items::Appendable; use items::ByteEstimate; -use items::LogItem; use items::PushableIndex; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StatsItem; -use items::StreamItem; use items::WithTimestamps; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::ByteSize; @@ -312,11 +312,20 @@ mod test { use crate::merge::MergedStream; use err::Error; use futures_util::StreamExt; - use items::{RangeCompletableItem, StreamItem}; + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::test_data_base_path_databuffer; - use netpod::timeunits::{DAY, MS}; - use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; + use netpod::timeunits::DAY; + use netpod::timeunits::MS; + use netpod::ByteOrder; + use netpod::ByteSize; + use netpod::Channel; + use netpod::ChannelConfig; + use netpod::NanoRange; + use netpod::Nanos; + use netpod::ScalarType; + use netpod::Shape; use std::path::PathBuf; use streams::eventchunker::EventChunker; use streams::eventchunker::EventChunkerConf; diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 56d46c1..bd22b64 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -3,8 +3,8 @@ use err::Error; use futures_util::pin_mut; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; -use items::Sitemty; +use items_0::streamitem::Sitemty; +use items_2::eventfull::EventFull; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::Cluster; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index b532649..df4d8a8 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,11 +3,11 @@ use err::Error; use futures_util::stream; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; +use items_2::eventfull::EventFull; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::AggKind; diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs index 5399715..af6f897 100644 --- a/disk/src/streamlog.rs +++ b/disk/src/streamlog.rs @@ -1,4 +1,4 @@ -use items::LogItem; +use items_0::streamitem::LogItem; use netpod::log::*; use std::collections::VecDeque; diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 6d7c5a9..50f1721 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,30 +1,60 @@ use crate::err::Error; -use crate::gather::{gather_get_json_generic, SubRes}; -use crate::{response, BodyStream, ReqCtx}; -use bytes::{BufMut, BytesMut}; -use futures_util::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use http::{Method, StatusCode}; -use hyper::{Body, Client, Request, Response}; -use items::eventfull::EventFull; -use items::{RangeCompletableItem, Sitemty, StreamItem}; +use crate::gather::gather_get_json_generic; +use crate::gather::SubRes; +use crate::response; +use crate::BodyStream; +use crate::ReqCtx; +use bytes::BufMut; +use bytes::BytesMut; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use futures_util::TryFutureExt; +use futures_util::TryStreamExt; +use http::Method; +use http::StatusCode; +use hyper::Body; +use hyper::Client; +use hyper::Request; +use hyper::Response; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::eventfull::EventFull; use itertools::Itertools; +use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::query::PlainEventsQuery; use netpod::timeunits::SEC; -use netpod::{log::*, ScalarType}; -use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape}; -use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig}; -use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use netpod::ByteSize; +use netpod::Channel; +use netpod::ChannelSearchQuery; +use netpod::ChannelSearchResult; +use netpod::DiskIoTune; +use netpod::NanoRange; +use netpod::NodeConfigCached; +use netpod::PerfOpts; +use netpod::ProxyConfig; +use netpod::ScalarType; +use netpod::Shape; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; +use netpod::APP_OCTET; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; -use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry}; -use serde::{Deserialize, Serialize}; +use parse::channelconfig::Config; +use parse::channelconfig::ConfigEntry; +use parse::channelconfig::MatchingConfigEntry; +use serde::Deserialize; +use serde::Serialize; use serde_json::Value as JsonValue; use std::fmt; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; use streams::eventchunker::EventChunkerConf; use tracing_futures::Instrument; use url::Url; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 9cf9f26..90944a1 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -541,6 +541,7 @@ async fn update_db_with_channel_names( } } +#[allow(unused)] async fn update_db_with_channel_names_3( req: Request, _ctx: &ReqCtx, @@ -611,9 +612,9 @@ pub struct StatusBoardEntry { ts_created: SystemTime, #[serde(serialize_with = "instant_serde::ser")] ts_updated: SystemTime, - #[serde(skip_serializing_if = "items::bool_is_false")] + #[serde(skip_serializing_if = "items_2::bool_is_false")] is_error: bool, - #[serde(skip_serializing_if = "items::bool_is_false")] + #[serde(skip_serializing_if = "items_2::bool_is_false")] is_ok: bool, #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, diff --git a/items/Cargo.toml b/items/Cargo.toml index e08262c..e471660 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -1,19 +1,15 @@ [package] name = "items" -version = "0.0.2" +version = "0.1.0" authors = ["Dominik Werder "] edition = "2021" -[lib] -path = "src/items.rs" - [dependencies] tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } futures-util = "0.3.15" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" ciborium = "0.2" -rmp-serde = "1.1.1" bson = "2.4.0" erased-serde = "0.3" bytes = "1.2.1" @@ -24,4 +20,3 @@ err = { path = "../err" } items_proc = { path = "../items_proc" } items_0 = { path = "../items_0" } netpod = { path = "../netpod" } -parse = { path = "../parse" } diff --git a/items/src/items.rs b/items/src/items.rs deleted file mode 100644 index 6e91b85..0000000 --- a/items/src/items.rs +++ /dev/null @@ -1,701 +0,0 @@ -pub mod eventfull; -pub mod frame; -pub mod inmem; -pub mod streams; - -use crate::frame::make_frame_2; -use bytes::BytesMut; -use chrono::TimeZone; -use chrono::Utc; -use err::Error; -use frame::make_error_frame; -use frame::make_log_frame; -use frame::make_range_complete_frame; -use frame::make_stats_frame; -use items_0::AsAnyRef; -use netpod::log::Level; -#[allow(unused)] -use netpod::log::*; -use netpod::DiskStats; -use netpod::EventDataReadStats; -use netpod::NanoRange; -use netpod::RangeFilterStats; -use netpod::Shape; -use serde::de::DeserializeOwned; -use serde::Deserialize; -use serde::Serialize; -use serde::Serializer; -use std::any::Any; -use std::fmt; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use tokio::fs::File; -use tokio::io::AsyncRead; -use tokio::io::ReadBuf; - -pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01; -pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02; -pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04; -pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; -pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500; -pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; -pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800; -pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; -pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00; -pub const LOG_FRAME_TYPE_ID: u32 = 0xc00; -pub const STATS_FRAME_TYPE_ID: u32 = 0xd00; -pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00; -pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; -pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; -pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; -pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; -pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; -pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; -pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; - -pub fn bool_is_false(j: &bool) -> bool { - *j == false -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum RangeCompletableItem { - RangeComplete, - Data(T), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum StatsItem { - EventDataReadStats(EventDataReadStats), - RangeFilterStats(RangeFilterStats), - DiskStats(DiskStats), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum StreamItem { - DataItem(T), - Log(LogItem), - Stats(StatsItem), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct LogItem { - pub node_ix: u32, - #[serde(with = "levelserde")] - pub level: Level, - pub msg: String, -} - -impl LogItem { - pub fn quick(level: Level, msg: String) -> Self { - Self { - level, - msg, - node_ix: 42, - } - } -} - -pub type Sitemty = Result>, Error>; - -#[macro_export] -macro_rules! on_sitemty_range_complete { - ($item:expr, $ex:expr) => { - if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { - $ex - } - }; -} - -impl FrameType for Sitemty -where - T: FrameType, -{ - fn frame_type_id(&self) -> u32 { - match self { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => SITEMTY_NONSPEC_FRAME_TYPE_ID, - RangeCompletableItem::Data(item) => item.frame_type_id(), - }, - StreamItem::Log(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID, - StreamItem::Stats(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID, - }, - Err(_) => ERROR_FRAME_TYPE_ID, - } - } -} - -pub fn sitem_data(x: X) -> Sitemty { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) -} - -mod levelserde { - use super::Level; - use serde::de::{self, Visitor}; - use serde::{Deserializer, Serializer}; - use std::fmt; - - pub fn serialize(t: &Level, se: S) -> Result - where - S: Serializer, - { - let g = match *t { - Level::ERROR => 1, - Level::WARN => 2, - Level::INFO => 3, - Level::DEBUG => 4, - Level::TRACE => 5, - }; - se.serialize_u32(g) - } - - struct VisitLevel; - - impl VisitLevel { - fn from_u32(x: u32) -> Level { - match x { - 1 => Level::ERROR, - 2 => Level::WARN, - 3 => Level::INFO, - 4 => Level::DEBUG, - 5 => Level::TRACE, - _ => Level::TRACE, - } - } - } - - impl<'de> Visitor<'de> for VisitLevel { - type Value = Level; - - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "expect Level code") - } - - fn visit_u64(self, val: u64) -> Result - where - E: de::Error, - { - Ok(VisitLevel::from_u32(val as _)) - } - } - - pub fn deserialize<'de, D>(de: D) -> Result - where - D: Deserializer<'de>, - { - de.deserialize_u32(VisitLevel) - } -} - -pub const INMEM_FRAME_ENCID: u32 = 0x12121212; -pub const INMEM_FRAME_HEAD: usize = 20; -pub const INMEM_FRAME_FOOT: usize = 4; -pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; - -// Required for any inner type of Sitemty. -pub trait FrameTypeInnerStatic { - const FRAME_TYPE_ID: u32; -} - -// To be implemented by the T of Sitemty, e.g. ScalarEvents. -pub trait FrameTypeInnerDyn { - // TODO check actual usage of this - fn frame_type_id(&self) -> u32; -} - -impl FrameTypeInnerDyn for T -where - T: FrameTypeInnerStatic, -{ - fn frame_type_id(&self) -> u32 { - ::FRAME_TYPE_ID - } -} - -pub trait FrameTypeStatic { - const FRAME_TYPE_ID: u32; -} - -impl FrameTypeStatic for Sitemty -where - T: FrameTypeInnerStatic, -{ - const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID; -} - -// Framable trait objects need some inspection to handle the supposed-to-be common Err ser format: -// Meant to be implemented by Sitemty. -pub trait FrameType { - fn frame_type_id(&self) -> u32; -} - -impl FrameType for Box -where - T: FrameType, -{ - fn frame_type_id(&self) -> u32 { - self.as_ref().frame_type_id() - } -} - -impl FrameTypeInnerDyn for Box { - fn frame_type_id(&self) -> u32 { - FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn()) - } -} - -impl FrameTypeInnerDyn for Box { - fn frame_type_id(&self) -> u32 { - FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn()) - } -} - -pub trait ContainsError { - fn is_err(&self) -> bool; - fn err(&self) -> Option<&::err::Error>; -} - -impl ContainsError for Box -where - T: ContainsError, -{ - fn is_err(&self) -> bool { - self.as_ref().is_err() - } - - fn err(&self) -> Option<&::err::Error> { - self.as_ref().err() - } -} - -impl ContainsError for Sitemty { - fn is_err(&self) -> bool { - match self { - Ok(_) => false, - Err(_) => true, - } - } - - fn err(&self) -> Option<&::err::Error> { - match self { - Ok(_) => None, - Err(e) => Some(e), - } - } -} - -pub trait Framable { - fn make_frame(&self) -> Result; -} - -pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send { - fn _dummy(&self); -} - -impl FramableInner for T { - fn _dummy(&self) {} -} - -erased_serde::serialize_trait_object!(EventsDyn); -erased_serde::serialize_trait_object!(TimeBinnableDyn); -erased_serde::serialize_trait_object!(TimeBinned); - -impl Framable for Sitemty -where - T: Sized + serde::Serialize + FrameType, -{ - fn make_frame(&self) -> Result { - match self { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { - let frame_type_id = k.frame_type_id(); - make_frame_2(self, frame_type_id) - } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(), - Ok(StreamItem::Log(item)) => make_log_frame(item), - Ok(StreamItem::Stats(item)) => make_stats_frame(item), - Err(e) => make_error_frame(e), - } - } -} - -impl Framable for Box -where - T: Framable + ?Sized, -{ - fn make_frame(&self) -> Result { - self.as_ref().make_frame() - } -} - -pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned { - fn from_error(e: ::err::Error) -> Self; - fn from_log(item: LogItem) -> Self; - fn from_stats(item: StatsItem) -> Self; - fn from_range_complete() -> Self; -} - -impl FrameDecodable for Sitemty -where - T: FrameTypeInnerStatic + DeserializeOwned, -{ - fn from_error(e: err::Error) -> Self { - Err(e) - } - - fn from_log(item: LogItem) -> Self { - Ok(StreamItem::Log(item)) - } - - fn from_stats(item: StatsItem) -> Self { - Ok(StreamItem::Stats(item)) - } - - fn from_range_complete() -> Self { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } -} - -#[derive(Serialize, Deserialize)] -pub struct EventQueryJsonStringFrame(pub String); - -impl FrameTypeInnerStatic for EventQueryJsonStringFrame { - const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME; -} - -impl FrameType for EventQueryJsonStringFrame { - fn frame_type_id(&self) -> u32 { - EventQueryJsonStringFrame::FRAME_TYPE_ID - } -} - -#[derive(Clone, Debug, Deserialize)] -pub struct IsoDateTime(chrono::DateTime); - -impl Serialize for IsoDateTime { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) - } -} - -pub fn make_iso_ts(tss: &[u64]) -> Vec { - tss.iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect() -} - -pub enum Fits { - Empty, - Lower, - Greater, - Inside, - PartlyLower, - PartlyGreater, - PartlyLowerAndGreater, -} - -pub trait WithLen { - fn len(&self) -> usize; -} - -pub trait WithTimestamps { - fn ts(&self, ix: usize) -> u64; -} - -pub trait ByteEstimate { - fn byte_estimate(&self) -> u64; -} - -pub trait RangeOverlapInfo { - // TODO do not take by value. - fn ends_before(&self, range: NanoRange) -> bool; - fn ends_after(&self, range: NanoRange) -> bool; - fn starts_after(&self, range: NanoRange) -> bool; -} - -pub trait FitsInside { - fn fits_inside(&self, range: NanoRange) -> Fits; -} - -pub trait FilterFittingInside: Sized { - fn filter_fitting_inside(self, fit_range: NanoRange) -> Option; -} - -pub trait PushableIndex { - // TODO get rid of usage, involves copy. - // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop? - fn push_index(&mut self, src: &Self, ix: usize); -} - -pub trait NewEmpty { - fn empty(shape: Shape) -> Self; -} - -pub trait Appendable: WithLen { - fn empty_like_self(&self) -> Self; - - // TODO get rid of usage, involves copy. - fn append(&mut self, src: &Self); - - // TODO the `ts2` makes no sense for non-bin-implementors - fn append_zero(&mut self, ts1: u64, ts2: u64); -} - -pub trait Clearable { - fn clear(&mut self); -} - -pub trait EventAppendable -where - Self: Sized, -{ - type Value; - fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self; -} - -pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { - fn ts1s(&self) -> &Vec; - fn ts2s(&self) -> &Vec; -} - -pub trait TimeBinnableType: - Send - + Unpin - + RangeOverlapInfo - + FilterFittingInside - + NewEmpty - + Appendable - + Serialize - + DeserializeOwned - + ReadableFromFile - + FrameTypeInnerStatic -{ - type Output: TimeBinnableType; - type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; - fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; -} - -/// Provides a time-binned representation of the implementing type. -/// In contrast to `TimeBinnableType` this is meant for trait objects. - -// TODO should not require Sync! -// TODO SitemtyFrameType is already supertrait of FramableInner. -pub trait TimeBinnableDyn: - fmt::Debug - + FramableInner - + FrameType - + FrameTypeInnerDyn - + WithLen - + RangeOverlapInfo - + Any - + AsAnyRef - + Sync - + Send - + 'static -{ - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box; -} - -pub trait TimeBinnableDynStub: - fmt::Debug - + FramableInner - + FrameType - + FrameTypeInnerDyn - + WithLen - + RangeOverlapInfo - + Any - + AsAnyRef - + Sync - + Send - + 'static -{ -} - -// impl for the stubs TODO: remove -impl TimeBinnableDyn for T -where - T: TimeBinnableDynStub, -{ - fn time_binner_new(&self, _edges: Vec, _do_time_weight: bool) -> Box { - error!("TODO impl time_binner_new for T {}", std::any::type_name::()); - err::todoval() - } -} - -// TODO maybe this is no longer needed: -pub trait TimeBinnableDynAggregator: Send { - fn ingest(&mut self, item: &dyn TimeBinnableDyn); - fn result(&mut self) -> Box; -} - -/// Container of some form of events, for use as trait object. -pub trait EventsDyn: TimeBinnableDyn { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; - fn verify(&self); - fn output_info(&self); -} - -/// Data in time-binned form. -pub trait TimeBinned: TimeBinnableDyn { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; - fn edges_slice(&self) -> (&[u64], &[u64]); - fn counts(&self) -> &[u64]; - fn mins(&self) -> Vec; - fn maxs(&self) -> Vec; - fn avgs(&self) -> Vec; - fn validate(&self) -> Result<(), String>; -} - -impl FrameType for Box { - fn frame_type_id(&self) -> u32 { - FrameType::frame_type_id(self.as_ref()) - } -} - -impl WithLen for Box { - fn len(&self) -> usize { - self.as_time_binnable_dyn().len() - } -} - -impl RangeOverlapInfo for Box { - fn ends_before(&self, range: NanoRange) -> bool { - self.as_time_binnable_dyn().ends_before(range) - } - - fn ends_after(&self, range: NanoRange) -> bool { - self.as_time_binnable_dyn().ends_after(range) - } - - fn starts_after(&self, range: NanoRange) -> bool { - self.as_time_binnable_dyn().starts_after(range) - } -} - -impl TimeBinnableDyn for Box { - fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box { - self.as_time_binnable_dyn().time_binner_new(edges, do_time_weight) - } -} - -// TODO should get I/O and tokio dependence out of this crate -pub trait ReadableFromFile: Sized { - fn read_from_file(file: File) -> Result, Error>; - // TODO should not need this: - fn from_buf(buf: &[u8]) -> Result; -} - -// TODO should get I/O and tokio dependence out of this crate -pub struct ReadPbv -where - T: ReadableFromFile, -{ - buf: Vec, - all: Vec, - file: Option, - _m1: PhantomData, -} - -impl ReadPbv -where - T: ReadableFromFile, -{ - pub fn new(file: File) -> Self { - Self { - // TODO make buffer size a parameter: - buf: vec![0; 1024 * 32], - all: vec![], - file: Some(file), - _m1: PhantomData, - } - } -} - -impl Future for ReadPbv -where - T: ReadableFromFile + Unpin, -{ - type Output = Result>, Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - let mut buf = std::mem::replace(&mut self.buf, Vec::new()); - let ret = 'outer: loop { - let mut dst = ReadBuf::new(&mut buf); - if dst.remaining() == 0 || dst.capacity() == 0 { - break Ready(Err(Error::with_msg("bad read buffer"))); - } - let fp = self.file.as_mut().unwrap(); - let f = Pin::new(fp); - break match File::poll_read(f, cx, &mut dst) { - Ready(res) => match res { - Ok(_) => { - if dst.filled().len() > 0 { - self.all.extend_from_slice(dst.filled()); - continue 'outer; - } else { - match T::from_buf(&mut self.all) { - Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), - Err(e) => Ready(Err(e)), - } - } - } - Err(e) => Ready(Err(e.into())), - }, - Pending => Pending, - }; - }; - self.buf = buf; - ret - } -} - -pub trait TimeBinnableTypeAggregator: Send { - type Input: TimeBinnableType; - type Output: TimeBinnableType; - fn range(&self) -> &NanoRange; - fn ingest(&mut self, item: &Self::Input); - // TODO this API is too convoluted for a minimal performance gain: should separate `result` and `reset` - // or simply require to construct a new which is almost equally expensive. - fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; -} - -pub trait TimestampInspectable: WithTimestamps + WithLen {} - -impl TimestampInspectable for T where T: WithTimestamps + WithLen {} - -pub fn inspect_timestamps(events: &dyn TimestampInspectable, range: NanoRange) -> String { - use fmt::Write; - let rd = range.delta(); - let mut buf = String::new(); - let n = events.len(); - for i in 0..n { - if i < 3 || i > (n - 4) { - let ts = events.ts(i); - let z = ts - range.beg; - let z = z as f64 / rd as f64 * 2.0 - 1.0; - write!(&mut buf, "i {:3} tt {:6.3}\n", i, z).unwrap(); - } - } - buf -} - -pub trait TimeBinnerDyn: Send { - fn bins_ready_count(&self) -> usize; - fn bins_ready(&mut self) -> Option>; - fn ingest(&mut self, item: &dyn TimeBinnableDyn); - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); -} diff --git a/items/src/lib.rs b/items/src/lib.rs new file mode 100644 index 0000000..58a62f9 --- /dev/null +++ b/items/src/lib.rs @@ -0,0 +1,163 @@ +pub mod streams; + +use err::Error; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StreamItem; +#[allow(unused)] +use netpod::log::*; +use netpod::NanoRange; +use netpod::Shape; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio::fs::File; +use tokio::io::AsyncRead; +use tokio::io::ReadBuf; + +pub enum Fits { + Empty, + Lower, + Greater, + Inside, + PartlyLower, + PartlyGreater, + PartlyLowerAndGreater, +} + +pub trait WithLen { + fn len(&self) -> usize; +} + +pub trait WithTimestamps { + fn ts(&self, ix: usize) -> u64; +} + +pub trait ByteEstimate { + fn byte_estimate(&self) -> u64; +} + +pub trait RangeOverlapInfo { + // TODO do not take by value. + fn ends_before(&self, range: NanoRange) -> bool; + fn ends_after(&self, range: NanoRange) -> bool; + fn starts_after(&self, range: NanoRange) -> bool; +} + +pub trait FitsInside { + fn fits_inside(&self, range: NanoRange) -> Fits; +} + +pub trait FilterFittingInside: Sized { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option; +} + +pub trait PushableIndex { + // TODO get rid of usage, involves copy. + // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop? + fn push_index(&mut self, src: &Self, ix: usize); +} + +pub trait NewEmpty { + fn empty(shape: Shape) -> Self; +} + +pub trait Appendable: WithLen { + fn empty_like_self(&self) -> Self; + + // TODO get rid of usage, involves copy. + fn append(&mut self, src: &Self); + + // TODO the `ts2` makes no sense for non-bin-implementors + fn append_zero(&mut self, ts1: u64, ts2: u64); +} + +pub trait Clearable { + fn clear(&mut self); +} + +pub trait EventAppendable +where + Self: Sized, +{ + type Value; + fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self; +} + +pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { + fn ts1s(&self) -> &Vec; + fn ts2s(&self) -> &Vec; +} + +// TODO should get I/O and tokio dependence out of this crate +pub trait ReadableFromFile: Sized { + fn read_from_file(file: File) -> Result, Error>; + // TODO should not need this: + fn from_buf(buf: &[u8]) -> Result; +} + +// TODO should get I/O and tokio dependence out of this crate +pub struct ReadPbv +where + T: ReadableFromFile, +{ + buf: Vec, + all: Vec, + file: Option, + _m1: PhantomData, +} + +impl ReadPbv +where + T: ReadableFromFile, +{ + pub fn new(file: File) -> Self { + Self { + // TODO make buffer size a parameter: + buf: vec![0; 1024 * 32], + all: vec![], + file: Some(file), + _m1: PhantomData, + } + } +} + +impl Future for ReadPbv +where + T: ReadableFromFile + Unpin, +{ + type Output = Result>, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + let mut buf = std::mem::replace(&mut self.buf, Vec::new()); + let ret = 'outer: loop { + let mut dst = ReadBuf::new(&mut buf); + if dst.remaining() == 0 || dst.capacity() == 0 { + break Ready(Err(Error::with_msg("bad read buffer"))); + } + let fp = self.file.as_mut().unwrap(); + let f = Pin::new(fp); + break match File::poll_read(f, cx, &mut dst) { + Ready(res) => match res { + Ok(_) => { + if dst.filled().len() > 0 { + self.all.extend_from_slice(dst.filled()); + continue 'outer; + } else { + match T::from_buf(&mut self.all) { + Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), + Err(e) => Ready(Err(e)), + } + } + } + Err(e) => Ready(Err(e.into())), + }, + Pending => Pending, + }; + }; + self.buf = buf; + ret + } +} diff --git a/items/src/streams.rs b/items/src/streams.rs index 3494896..c18d002 100644 --- a/items/src/streams.rs +++ b/items/src/streams.rs @@ -1,7 +1,13 @@ -use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen}; +use crate::RangeCompletableItem; +use crate::StreamItem; +use crate::WithLen; use err::Error; -use futures_util::{Stream, StreamExt}; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; use netpod::log::*; +use netpod::DiskStats; use serde::Serialize; use serde_json::Value as JsonValue; use std::fmt; @@ -98,8 +104,6 @@ where } } StreamItem::Stats(item) => { - use crate::StatsItem; - use netpod::DiskStats; match item { // TODO factor and simplify the stats collection: StatsItem::EventDataReadStats(_) => {} diff --git a/items_0/Cargo.toml b/items_0/Cargo.toml index 4e2c6f8..78cf410 100644 --- a/items_0/Cargo.toml +++ b/items_0/Cargo.toml @@ -12,5 +12,7 @@ serde = { version = "1.0", features = ["derive"] } erased-serde = "0.3" serde_json = "1.0" bincode = "1.3.3" +bytes = "1.2.1" +chrono = { version = "0.4.19", features = ["serde"] } netpod = { path = "../netpod" } err = { path = "../err" } diff --git a/items_0/src/framable.rs b/items_0/src/framable.rs new file mode 100644 index 0000000..5fd10cf --- /dev/null +++ b/items_0/src/framable.rs @@ -0,0 +1,19 @@ +// Required for any inner type of Sitemty. +pub trait FrameTypeInnerStatic { + const FRAME_TYPE_ID: u32; +} + +// To be implemented by the T of Sitemty, e.g. ScalarEvents. +pub trait FrameTypeInnerDyn { + // TODO check actual usage of this + fn frame_type_id(&self) -> u32; +} + +impl FrameTypeInnerDyn for T +where + T: FrameTypeInnerStatic, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } +} diff --git a/items_0/src/isodate.rs b/items_0/src/isodate.rs new file mode 100644 index 0000000..daeffd7 --- /dev/null +++ b/items_0/src/isodate.rs @@ -0,0 +1,24 @@ +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; +use serde::Deserialize; +use serde::Serialize; +use serde::Serializer; + +#[derive(Clone, Debug, Deserialize)] +pub struct IsoDateTime(DateTime); + +impl Serialize for IsoDateTime { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string()) + } +} + +pub fn make_iso_ts(tss: &[u64]) -> Vec { + tss.iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect() +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index e8a571d..985deb5 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -1,6 +1,9 @@ pub mod collect_c; pub mod collect_s; +pub mod framable; +pub mod isodate; pub mod scalar_ops; +pub mod streamitem; pub mod subfr; pub mod bincode { @@ -91,7 +94,7 @@ where } /// Data in time-binned form. -pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable { +pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable + erased_serde::Serialize { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; fn as_collectable_mut(&mut self) -> &mut dyn Collectable; fn edges_slice(&self) -> (&[u64], &[u64]); diff --git a/items_0/src/streamitem.rs b/items_0/src/streamitem.rs new file mode 100644 index 0000000..ade3362 --- /dev/null +++ b/items_0/src/streamitem.rs @@ -0,0 +1,179 @@ +use crate::TimeBinned; +use err::Error; +use netpod::log::Level; +use netpod::DiskStats; +use netpod::EventDataReadStats; +use netpod::RangeFilterStats; +use serde::Deserialize; +use serde::Serialize; + +pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01; +pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02; +pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04; +pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; +pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500; +pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; +pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800; +pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; +pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00; +pub const LOG_FRAME_TYPE_ID: u32 = 0xc00; +pub const STATS_FRAME_TYPE_ID: u32 = 0xd00; +pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00; +pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; +pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; +pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; +pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; +pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; +pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; +pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; + +pub fn bool_is_false(j: &bool) -> bool { + *j == false +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum RangeCompletableItem { + RangeComplete, + Data(T), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum StatsItem { + EventDataReadStats(EventDataReadStats), + RangeFilterStats(RangeFilterStats), + DiskStats(DiskStats), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum StreamItem { + DataItem(T), + Log(LogItem), + Stats(StatsItem), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct LogItem { + pub node_ix: u32, + #[serde(with = "levelserde")] + pub level: Level, + pub msg: String, +} + +impl LogItem { + pub fn quick(level: Level, msg: String) -> Self { + Self { + level, + msg, + node_ix: 42, + } + } +} + +pub type Sitemty = Result>, Error>; + +#[macro_export] +macro_rules! on_sitemty_range_complete { + ($item:expr, $ex:expr) => { + if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { + $ex + } + }; +} + +pub fn sitem_data(x: X) -> Sitemty { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) +} + +mod levelserde { + use super::Level; + use serde::de::{self, Visitor}; + use serde::{Deserializer, Serializer}; + use std::fmt; + + pub fn serialize(t: &Level, se: S) -> Result + where + S: Serializer, + { + let g = match *t { + Level::ERROR => 1, + Level::WARN => 2, + Level::INFO => 3, + Level::DEBUG => 4, + Level::TRACE => 5, + }; + se.serialize_u32(g) + } + + struct VisitLevel; + + impl VisitLevel { + fn from_u32(x: u32) -> Level { + match x { + 1 => Level::ERROR, + 2 => Level::WARN, + 3 => Level::INFO, + 4 => Level::DEBUG, + 5 => Level::TRACE, + _ => Level::TRACE, + } + } + } + + impl<'de> Visitor<'de> for VisitLevel { + type Value = Level; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "expect Level code") + } + + fn visit_u64(self, val: u64) -> Result + where + E: de::Error, + { + Ok(VisitLevel::from_u32(val as _)) + } + } + + pub fn deserialize<'de, D>(de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_u32(VisitLevel) + } +} + +pub trait ContainsError { + fn is_err(&self) -> bool; + fn err(&self) -> Option<&::err::Error>; +} + +impl ContainsError for Box +where + T: ContainsError, +{ + fn is_err(&self) -> bool { + self.as_ref().is_err() + } + + fn err(&self) -> Option<&::err::Error> { + self.as_ref().err() + } +} + +impl ContainsError for Sitemty { + fn is_err(&self) -> bool { + match self { + Ok(_) => false, + Err(_) => true, + } + } + + fn err(&self) -> Option<&::err::Error> { + match self { + Ok(_) => None, + Err(e) => Some(e), + } + } +} + +erased_serde::serialize_trait_object!(TimeBinned); diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml index da49755..bc4ca4a 100644 --- a/items_2/Cargo.toml +++ b/items_2/Cargo.toml @@ -20,9 +20,11 @@ crc32fast = "1.3.2" futures-util = "0.3.24" tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } humantime-serde = "1.1.1" +rmp-serde = "1.1.1" err = { path = "../err" } items = { path = "../items" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } taskrun = { path = "../taskrun" } +parse = { path = "../parse" } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index fc77ad8..bf4b339 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,9 +1,10 @@ +use crate::framable::FrameType; use crate::merger; use crate::Events; -use items::FrameType; -use items::FrameTypeInnerStatic; use items_0::collect_s::Collectable; use items_0::collect_s::Collector; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::AsAnyMut; use items_0::AsAnyRef; use netpod::log::*; @@ -88,7 +89,7 @@ pub enum ChannelEvents { } impl FrameTypeInnerStatic for ChannelEvents { - const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; + const FRAME_TYPE_ID: u32 = ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; } impl FrameType for ChannelEvents { diff --git a/items_2/src/databuffereventblobs.rs b/items_2/src/databuffereventblobs.rs deleted file mode 100644 index c1eaeea..0000000 --- a/items_2/src/databuffereventblobs.rs +++ /dev/null @@ -1,24 +0,0 @@ -use items::FrameType; -use items::FrameTypeInnerStatic; -use serde::Serialize; - -pub struct DatabufferEventBlob {} - -impl FrameTypeInnerStatic for DatabufferEventBlob { - const FRAME_TYPE_ID: u32 = items::DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID; -} - -impl FrameType for DatabufferEventBlob { - fn frame_type_id(&self) -> u32 { - ::FRAME_TYPE_ID - } -} - -impl Serialize for DatabufferEventBlob { - fn serialize(&self, _serializer: S) -> Result - where - S: serde::Serializer, - { - todo!() - } -} diff --git a/items/src/eventfull.rs b/items_2/src/eventfull.rs similarity index 92% rename from items/src/eventfull.rs rename to items_2/src/eventfull.rs index 6b62a16..7685343 100644 --- a/items/src/eventfull.rs +++ b/items_2/src/eventfull.rs @@ -1,12 +1,12 @@ -use crate::Appendable; -use crate::ByteEstimate; -use crate::Clearable; -use crate::FrameType; -use crate::FrameTypeInnerStatic; -use crate::PushableIndex; -use crate::WithLen; -use crate::WithTimestamps; +use crate::framable::FrameType; use bytes::BytesMut; +use items::ByteEstimate; +use items::Clearable; +use items::PushableIndex; +use items::WithTimestamps; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; +use items_0::WithLen; use netpod::ScalarType; use netpod::Shape; use parse::channelconfig::CompressionMethod; @@ -123,7 +123,7 @@ impl EventFull { } impl FrameTypeInnerStatic for EventFull { - const FRAME_TYPE_ID: u32 = crate::EVENT_FULL_FRAME_TYPE_ID; + const FRAME_TYPE_ID: u32 = EVENT_FULL_FRAME_TYPE_ID; } impl FrameType for EventFull { @@ -138,7 +138,13 @@ impl WithLen for EventFull { } } -impl Appendable for EventFull { +impl items::WithLen for EventFull { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl items::Appendable for EventFull { fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index e2fba61..15b9acf 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1155,16 +1155,15 @@ impl items_0::collect_c::Collectable for EventsDim0 { #[cfg(test)] mod test_frame { + use super::*; use crate::channelevents::ChannelEvents; - use crate::eventsdim0::EventsDim0; - use err::Error; - use items::Framable; - use items::RangeCompletableItem; - use items::Sitemty; - use items::StreamItem; - use items_0::AsAnyMut; - use items_0::Empty; - use items_0::Events; + use crate::framable::Framable; + use crate::framable::INMEM_FRAME_ENCID; + use crate::frame::decode_frame; + use crate::inmem::InMemoryFrame; + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::Sitemty; + use items_0::streamitem::StreamItem; #[test] fn events_bincode() { @@ -1180,13 +1179,13 @@ mod test_frame { let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]); eprintln!("[[{s}]]"); let buflen = buf.len(); - let frame = items::inmem::InMemoryFrame { - encid: items::INMEM_FRAME_ENCID, + let frame = InMemoryFrame { + encid: INMEM_FRAME_ENCID, tyid: 0x2500, len: (buflen - 24) as _, buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(), }; - let item: Sitemty = items::frame::decode_frame(&frame).unwrap(); + let item: Sitemty = decode_frame(&frame).unwrap(); let item = if let Ok(x) = item { x } else { panic!() }; let item = if let StreamItem::DataItem(x) = item { x @@ -1208,31 +1207,34 @@ mod test_frame { } else { panic!() }; - eprintln!("NOW WE SEE: {:?}", item); - // type_name_of_val alloc::boxed::Box - eprintln!("0 {:22?}", item.as_any_mut().type_id()); - eprintln!("A {:22?}", std::any::TypeId::of::>()); - eprintln!("B {:22?}", std::any::TypeId::of::()); - eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>()); - eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>()); - eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box>()); - eprintln!("F {:22?}", std::any::TypeId::of::>>()); - eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0>()); - eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0>()); - eprintln!("I {:22?}", std::any::TypeId::of::>>>()); - //let item = item.as_mut(); - //eprintln!("1 {:22?}", item.type_id()); - /* - let item = if let Some(item) = - items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::>>() - { - item - } else { - panic!() - }; - */ - eprintln!("Final value: {item:?}"); assert_eq!(item.tss(), &[123]); + #[cfg(DISABLED)] + { + eprintln!("NOW WE SEE: {:?}", item); + // type_name_of_val alloc::boxed::Box + eprintln!("0 {:22?}", item.as_any_mut().type_id()); + eprintln!("A {:22?}", std::any::TypeId::of::>()); + eprintln!("B {:22?}", std::any::TypeId::of::()); + eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>()); + eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>()); + eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box>()); + eprintln!("F {:22?}", std::any::TypeId::of::>>()); + eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0>()); + eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0>()); + eprintln!("I {:22?}", std::any::TypeId::of::>>>()); + //let item = item.as_mut(); + //eprintln!("1 {:22?}", item.type_id()); + /* + let item = if let Some(item) = + items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::>>() + { + item + } else { + panic!() + }; + */ + //eprintln!("Final value: {item:?}"); + } } } diff --git a/items_2/src/framable.rs b/items_2/src/framable.rs new file mode 100644 index 0000000..4cdaf6e --- /dev/null +++ b/items_2/src/framable.rs @@ -0,0 +1,150 @@ +use crate::frame::make_error_frame; +use crate::frame::make_frame_2; +use crate::frame::make_log_frame; +use crate::frame::make_range_complete_frame; +use crate::frame::make_stats_frame; +use bytes::BytesMut; +use err::Error; +use items_0::framable::FrameTypeInnerDyn; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; +use items_0::streamitem::ERROR_FRAME_TYPE_ID; +use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; +use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID; +use serde::de::DeserializeOwned; +use serde::Deserialize; +use serde::Serialize; + +pub const INMEM_FRAME_ENCID: u32 = 0x12121212; +pub const INMEM_FRAME_HEAD: usize = 20; +pub const INMEM_FRAME_FOOT: usize = 4; +pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; + +pub trait FrameTypeStatic { + const FRAME_TYPE_ID: u32; +} + +impl FrameTypeStatic for Sitemty +where + T: FrameTypeInnerStatic, +{ + const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID; +} + +// Framable trait objects need some inspection to handle the supposed-to-be common Err ser format: +// Meant to be implemented by Sitemty. +pub trait FrameType { + fn frame_type_id(&self) -> u32; +} + +impl FrameType for Box +where + T: FrameType, +{ + fn frame_type_id(&self) -> u32 { + self.as_ref().frame_type_id() + } +} + +pub trait Framable { + fn make_frame(&self) -> Result; +} + +pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send { + fn _dummy(&self); +} + +impl FramableInner for T { + fn _dummy(&self) {} +} + +impl Framable for Sitemty +where + T: Sized + serde::Serialize + FrameType, +{ + fn make_frame(&self) -> Result { + match self { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { + let frame_type_id = k.frame_type_id(); + make_frame_2(self, frame_type_id) + } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(), + Ok(StreamItem::Log(item)) => make_log_frame(item), + Ok(StreamItem::Stats(item)) => make_stats_frame(item), + Err(e) => make_error_frame(e), + } + } +} + +impl Framable for Box +where + T: Framable + ?Sized, +{ + fn make_frame(&self) -> Result { + self.as_ref().make_frame() + } +} + +pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned { + fn from_error(e: ::err::Error) -> Self; + fn from_log(item: LogItem) -> Self; + fn from_stats(item: StatsItem) -> Self; + fn from_range_complete() -> Self; +} + +impl FrameDecodable for Sitemty +where + T: FrameTypeInnerStatic + DeserializeOwned, +{ + fn from_error(e: err::Error) -> Self { + Err(e) + } + + fn from_log(item: LogItem) -> Self { + Ok(StreamItem::Log(item)) + } + + fn from_stats(item: StatsItem) -> Self { + Ok(StreamItem::Stats(item)) + } + + fn from_range_complete() -> Self { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } +} + +#[derive(Serialize, Deserialize)] +pub struct EventQueryJsonStringFrame(pub String); + +impl FrameTypeInnerStatic for EventQueryJsonStringFrame { + const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME; +} + +impl FrameType for EventQueryJsonStringFrame { + fn frame_type_id(&self) -> u32 { + EventQueryJsonStringFrame::FRAME_TYPE_ID + } +} + +impl FrameType for Sitemty +where + T: FrameType, +{ + fn frame_type_id(&self) -> u32 { + match self { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => SITEMTY_NONSPEC_FRAME_TYPE_ID, + RangeCompletableItem::Data(item) => item.frame_type_id(), + }, + StreamItem::Log(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID, + StreamItem::Stats(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID, + }, + Err(_) => ERROR_FRAME_TYPE_ID, + } + } +} diff --git a/items/src/frame.rs b/items_2/src/frame.rs similarity index 93% rename from items/src/frame.rs rename to items_2/src/frame.rs index d80dffd..519cde5 100644 --- a/items/src/frame.rs +++ b/items_2/src/frame.rs @@ -1,13 +1,28 @@ +use crate::framable::FrameDecodable; +use crate::framable::FrameType; +use crate::framable::INMEM_FRAME_ENCID; +use crate::framable::INMEM_FRAME_HEAD; +use crate::framable::INMEM_FRAME_MAGIC; use crate::inmem::InMemoryFrame; -use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem}; -use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; -use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID}; -use bincode::config::{FixintEncoding, LittleEndian, RejectTrailing}; -use bincode::config::{WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing}; +use bincode::config::FixintEncoding; +use bincode::config::LittleEndian; +use bincode::config::RejectTrailing; +use bincode::config::WithOtherEndian; +use bincode::config::WithOtherIntEncoding; +use bincode::config::WithOtherTrailing; use bincode::DefaultOptions; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; +use bytes::BytesMut; use err::Error; use items_0::bincode; +use items_0::streamitem::ContainsError; +use items_0::streamitem::LogItem; +use items_0::streamitem::StatsItem; +use items_0::streamitem::ERROR_FRAME_TYPE_ID; +use items_0::streamitem::LOG_FRAME_TYPE_ID; +use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID; +use items_0::streamitem::STATS_FRAME_TYPE_ID; +use items_0::streamitem::TERM_FRAME_TYPE_ID; #[allow(unused)] use netpod::log::*; use serde::Serialize; diff --git a/items/src/inmem.rs b/items_2/src/inmem.rs similarity index 100% rename from items/src/inmem.rs rename to items_2/src/inmem.rs diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index d5b4be0..418c896 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -1,11 +1,14 @@ pub mod binsdim0; pub mod binsxbindim0; pub mod channelevents; -pub mod databuffereventblobs; +pub mod eventfull; pub mod eventsdim0; pub mod eventsdim1; pub mod eventsxbindim0; pub mod eventtransform; +pub mod framable; +pub mod frame; +pub mod inmem; pub mod merger; pub mod streams; #[cfg(test)] @@ -20,11 +23,11 @@ use chrono::Utc; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; use items_0::collect_s::Collector; use items_0::collect_s::ToJsonResult; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::Empty; use items_0::Events; use items_0::RangeOverlapInfo; diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 8a6c7da..f283d6e 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -1,10 +1,10 @@ use crate::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::sitem_data; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use netpod::log::*; use std::collections::VecDeque; use std::fmt; @@ -359,7 +359,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.poll_count += 1; - let span1 = span!(Level::TRACE, "Merger", pc = self.poll_count); + let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); let _spg = span1.enter(); loop { trace3!("poll"); diff --git a/items_2/src/test.rs b/items_2/src/test.rs index cdcd94d..f36dfaa 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -15,10 +15,10 @@ use chrono::TimeZone; use chrono::Utc; use futures_util::stream; use futures_util::StreamExt; -use items::sitem_data; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::Empty; use netpod::log::*; use netpod::timeunits::*; diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 638f2d6..ea8b05e 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,15 +1,17 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::frame::decode_frame; -use items::frame::make_term_frame; -use items::EventQueryJsonStringFrame; -use items::Framable; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; use items_0::Empty; use items_2::channelevents::ChannelEvents; +use items_2::framable::EventQueryJsonStringFrame; +use items_2::framable::Framable; +use items_2::frame::decode_frame; +use items_2::frame::make_error_frame; +use items_2::frame::make_term_frame; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::query::PlainEventsQuery; @@ -232,7 +234,7 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("missing command frame"), netout).into()); } let query_frame = &frames[0]; - if query_frame.tyid() != items::EVENT_QUERY_JSON_STRING_FRAME { + if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { return Err((Error::with_msg("query frame wrong type"), netout).into()); } // TODO this does not need all variants of Sitemty. @@ -364,7 +366,7 @@ async fn events_conn_handler_inner( // If that fails, give error to the caller. let mut out = ce.netout; let e = ce.err; - let buf = items::frame::make_error_frame(&e)?; + let buf = make_error_frame(&e)?; //type T = StreamItem>>; //let buf = Err::(e).make_frame()?; out.write_all(&buf).await?; diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 96f428d..477a8df 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -1,12 +1,33 @@ -use std::time::Duration; - -use super::*; -use items::frame::make_frame; -use items::Sitemty; +use crate::conn::events_conn_handler; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::streamitem::ERROR_FRAME_TYPE_ID; +use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; +use items_0::streamitem::LOG_FRAME_TYPE_ID; +use items_0::streamitem::STATS_FRAME_TYPE_ID; use items_2::channelevents::ChannelEvents; +use items_2::framable::EventQueryJsonStringFrame; +use items_2::frame::decode_frame; +use items_2::frame::make_frame; +use netpod::query::PlainEventsQuery; use netpod::timeunits::SEC; -use netpod::{Channel, Cluster, Database, FileIoBufferSize, NanoRange, Node, NodeConfig, SfDatabuffer}; +use netpod::AggKind; +use netpod::Channel; +use netpod::Cluster; +use netpod::Database; +use netpod::FileIoBufferSize; +use netpod::NanoRange; +use netpod::Node; +use netpod::NodeConfig; +use netpod::NodeConfigCached; +use netpod::SfDatabuffer; +use std::time::Duration; +use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; +use tokio::net::TcpStream; #[test] fn raw_data_00() { @@ -82,10 +103,10 @@ fn raw_data_00() { Ok(frame) => match frame { StreamItem::DataItem(k) => { eprintln!("{k:?}"); - if k.tyid() == items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID { - } else if k.tyid() == items::ERROR_FRAME_TYPE_ID { - } else if k.tyid() == items::LOG_FRAME_TYPE_ID { - } else if k.tyid() == items::STATS_FRAME_TYPE_ID { + if k.tyid() == ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID { + } else if k.tyid() == ERROR_FRAME_TYPE_ID { + } else if k.tyid() == LOG_FRAME_TYPE_ID { + } else if k.tyid() == STATS_FRAME_TYPE_ID { } else { panic!("unexpected frame type id {:x}", k.tyid()); } diff --git a/streams/src/collect.rs b/streams/src/collect.rs index bd7883a..059680c 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -1,10 +1,18 @@ use err::Error; -use futures_util::{Stream, StreamExt}; -use items::{RangeCompletableItem, Sitemty, StreamItem}; +use futures_util::Stream; +use futures_util::StreamExt; use items_0::collect_c::Collectable; -use netpod::{log::*, BinnedRange, NanoRange}; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; +use netpod::log::*; +use netpod::BinnedRange; +use netpod::DiskStats; +use netpod::NanoRange; use std::fmt; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; use tracing::Instrument; #[allow(unused)] @@ -88,8 +96,6 @@ where } StreamItem::Stats(item) => { trace!("Stats {:?}", item); - use items::StatsItem; - use netpod::DiskStats; match item { // TODO factor and simplify the stats collection: StatsItem::EventDataReadStats(_) => {} diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 1d58778..5f90738 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -1,19 +1,30 @@ use crate::filechunkread::FileChunkRead; use crate::needminbuffer::NeedMinBuffer; use bitshuffle::bitshuffle_decompress; -use bytes::{Buf, BytesMut}; +use bytes::Buf; +use bytes::BytesMut; use err::Error; -use futures_util::{Stream, StreamExt}; -use items::eventfull::EventFull; -use items::{RangeCompletableItem, StatsItem, StreamItem, WithLen}; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; +use items_0::WithLen; +use items_2::eventfull::EventFull; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use netpod::ByteSize; +use netpod::ChannelConfig; +use netpod::EventDataReadStats; +use netpod::NanoRange; +use netpod::ScalarType; +use netpod::Shape; use parse::channelconfig::CompressionMethod; use std::path::PathBuf; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use std::time::Instant; pub struct EventChunker { diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index 99b1168..38c2cf7 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -1,11 +1,11 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::frame::decode_frame; -use items::inmem::InMemoryFrame; -use items::FrameTypeInnerStatic; -use items::Sitemty; -use items::StreamItem; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::frame::decode_frame; +use items_2::inmem::InMemoryFrame; use netpod::log::*; use serde::de::DeserializeOwned; use std::marker::PhantomData; diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 5c65556..45f52ca 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -1,15 +1,20 @@ use crate::slidebuf::SlideBuf; use bytes::Bytes; use err::Error; -use futures_util::{pin_mut, Stream}; -use items::inmem::InMemoryFrame; -use items::{StreamItem, TERM_FRAME_TYPE_ID}; -use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; +use futures_util::pin_mut; +use futures_util::Stream; +use items_0::streamitem::StreamItem; +use items_0::streamitem::TERM_FRAME_TYPE_ID; +use items_2::framable::INMEM_FRAME_FOOT; +use items_2::framable::INMEM_FRAME_HEAD; +use items_2::framable::INMEM_FRAME_MAGIC; +use items_2::inmem::InMemoryFrame; use netpod::log::*; use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, ReadBuf}; -use tracing::Instrument; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::io::ReadBuf; #[allow(unused)] macro_rules! trace2 { diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 73eef1c..d531f5a 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -1,7 +1,9 @@ +use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::open_tcp_streams; use err::Error; use futures_util::stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_data; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::query::PlainEventsQuery; @@ -38,7 +40,7 @@ pub async fn plain_events_json( let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &ev_agg_kind)?; info!("plain_events_json with empty item {}", empty.type_name()); let empty = ChannelEvents::Events(empty); - let empty = items::sitem_data(empty); + let empty = sitem_data(empty); // TODO should be able to ask for data-events only, instead of mixed data and status events. let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; @@ -48,7 +50,7 @@ pub async fn plain_events_json( info!("item after merge: {item:?}"); item }); - let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); + let stream = RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); let stream = stream.map(|item| { info!("item after rangefilter: {item:?}"); item diff --git a/streams/src/rangefilter.rs b/streams/src/rangefilter.rs index bdcc604..5ec5ac0 100644 --- a/streams/src/rangefilter.rs +++ b/streams/src/rangefilter.rs @@ -1,12 +1,22 @@ use err::Error; -use futures_util::{Stream, StreamExt}; -use items::StatsItem; -use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; -use netpod::{log::*, RangeFilterStats}; -use netpod::{NanoRange, Nanos}; +use futures_util::Stream; +use futures_util::StreamExt; +use items::Appendable; +use items::Clearable; +use items::PushableIndex; +use items::WithTimestamps; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; +use netpod::log::*; +use netpod::NanoRange; +use netpod::Nanos; +use netpod::RangeFilterStats; use std::fmt; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; pub struct RangeFilter where diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index 7fec24a..ef3e784 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -1,10 +1,10 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StatsItem; -use items::StreamItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; use items_2::merger::MergeError; use items_2::merger::Mergeable; use netpod::log::*; diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 9f6b869..f2b8c2b 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -10,19 +10,22 @@ use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; -use items::frame::make_frame; -use items::frame::make_term_frame; -use items::sitem_data; -use items::EventQueryJsonStringFrame; -use items::RangeCompletableItem; -use items::Sitemty; -use items::StreamItem; +use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::eventfull::EventFull; +use items_2::framable::EventQueryJsonStringFrame; +use items_2::frame::make_frame; +use items_2::frame::make_term_frame; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::Cluster; use netpod::Node; use netpod::PerfOpts; +use serde::de::DeserializeOwned; +use serde::Serialize; use std::fmt; use std::pin::Pin; use tokio::io::AsyncWriteExt; @@ -59,9 +62,9 @@ pub type BoxedStream = Pin> + Send>>; pub async fn open_tcp_streams(query: Q, cluster: &Cluster) -> Result>, Error> where - Q: serde::Serialize, + Q: Serialize, // Group bounds in new trait - T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + fmt::Debug + 'static, + T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { // TODO when unit tests established, change to async connect: let mut streams = Vec::new(); diff --git a/streams/src/test.rs b/streams/src/test.rs index 60ced5d..9f1550d 100644 --- a/streams/src/test.rs +++ b/streams/src/test.rs @@ -4,8 +4,10 @@ mod collect; mod timebin; use err::Error; -use futures_util::{stream, Stream}; -use items::{sitem_data, Sitemty}; +use futures_util::stream; +use futures_util::Stream; +use items_0::streamitem::sitem_data; +use items_0::streamitem::Sitemty; use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index d2850c2..cf517b0 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -1,11 +1,12 @@ use crate::test::runfut; use err::Error; use futures_util::stream; -use items::sitem_data; +use items_0::streamitem::sitem_data; use items_2::eventsdim0::EventsDim0CollectorOutput; use items_2::testgen::make_some_boxed_d0_f32; use netpod::timeunits::SEC; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; #[test] fn collect_channel_events() -> Result<(), Error> { diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index 3d2e311..f627a33 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -1,14 +1,21 @@ use crate::test::runfut; use err::Error; -use futures_util::{stream, StreamExt}; -use items::{sitem_data, RangeCompletableItem, StreamItem}; +use futures_util::stream; +use futures_util::StreamExt; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StreamItem; use items_0::Empty; use items_2::binsdim0::BinsDim0; -use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent}; +use items_2::channelevents::ChannelEvents; +use items_2::channelevents::ConnStatus; +use items_2::channelevents::ConnStatusEvent; use items_2::testgen::make_some_boxed_d0_f32; -use netpod::timeunits::{MS, SEC}; +use netpod::timeunits::MS; +use netpod::timeunits::SEC; use std::collections::VecDeque; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; #[test] fn time_bin_00() { diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index 97fca8d..9cdc874 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -1,12 +1,19 @@ use err::Error; -use futures_util::{Future, FutureExt, Stream, StreamExt}; -use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; -//use items_0::{TimeBinnable, TimeBinner}; -use items_2::timebin::{TimeBinnable, TimeBinner}; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::timebin::TimeBinnable; +use items_2::timebin::TimeBinner; use netpod::log::*; use std::fmt; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use std::time::Instant; #[allow(unused)] diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index bf9d27e..21d1f97 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -1,8 +1,13 @@ +use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::open_tcp_streams; +use crate::timebin::TimeBinnedStream; use err::Error; use futures_util::stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_data; +use items_0::streamitem::Sitemty; use items_2::channelevents::ChannelEvents; +use items_2::merger::Merger; #[allow(unused)] use netpod::log::*; use netpod::query::BinnedQuery; @@ -20,7 +25,7 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let deadline = Instant::now() + query.timeout_value(); let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, &query.agg_kind())?; let empty = ChannelEvents::Events(empty); - let empty = items::sitem_data(empty); + let empty = sitem_data(empty); let evquery = PlainEventsQuery::new( query.channel().clone(), query.range().clone(), @@ -31,14 +36,14 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: info!("timebinned_json with empty item {empty:?}"); - let stream = items_2::merger::Merger::new(inps, 128); + let stream = Merger::new(inps, 128); let stream = stream::iter([empty]).chain(stream); - let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); + let stream = RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); let stream = Box::pin(stream); - let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline); + let stream = TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline); if false { let mut stream = stream; - let _: Option>> = stream.next().await; + let _: Option>> = stream.next().await; panic!() } let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?;