diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml
index 6a08967..57208d9 100644
--- a/commonio/Cargo.toml
+++ b/commonio/Cargo.toml
@@ -23,5 +23,5 @@ crc32fast = "1.2"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
-items = { path = "../items" }
+items_0 = { path = "../items_0" }
items_proc = { path = "../items_proc" }
diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs
index 836e192..ec79e08 100644
--- a/commonio/src/commonio.rs
+++ b/commonio/src/commonio.rs
@@ -4,9 +4,9 @@ use async_channel::Sender;
use err::ErrStr;
use err::Error;
use futures_util::StreamExt;
-use items::Sitemty;
-use items::StatsItem;
-use items::StreamItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StatsItem;
+use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::DiskStats;
use netpod::OpenStats;
diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs
index 81cf981..3b1341c 100644
--- a/daqbufp2/src/client.rs
+++ b/daqbufp2/src/client.rs
@@ -7,7 +7,7 @@ use futures_util::TryStreamExt;
use http::StatusCode;
use httpclient::HttpBodyAsAsyncRead;
use hyper::Body;
-use items::StreamItem;
+use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::query::CacheUsage;
diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs
index 467264c..13bcc2a 100644
--- a/daqbufp2/src/test/binnedbinary.rs
+++ b/daqbufp2/src/test/binnedbinary.rs
@@ -9,7 +9,7 @@ use futures_util::TryStreamExt;
use http::StatusCode;
use httpclient::HttpBodyAsAsyncRead;
use hyper::Body;
-use items::StreamItem;
+use items_0::streamitem::StreamItem;
use items_0::subfr::SubFrId;
use netpod::log::*;
use netpod::query::BinnedQuery;
diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs
index d5a0b7f..6ed0ef9 100644
--- a/daqbufp2/src/test/events.rs
+++ b/daqbufp2/src/test/events.rs
@@ -9,7 +9,7 @@ use futures_util::TryStreamExt;
use http::StatusCode;
use httpclient::HttpBodyAsAsyncRead;
use hyper::Body;
-use items::StreamItem;
+use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::AggKind;
diff --git a/disk/src/decode.rs b/disk/src/decode.rs
index 2f09e79..5fb7c6f 100644
--- a/disk/src/decode.rs
+++ b/disk/src/decode.rs
@@ -2,12 +2,12 @@ use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::eventfull::EventFull;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StreamItem;
use items_0::scalar_ops::ScalarOps;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
use items_0::Events;
+use items_2::eventfull::EventFull;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
#[allow(unused)]
diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs
index c847379..a5d2e92 100644
--- a/disk/src/eventblobs.rs
+++ b/disk/src/eventblobs.rs
@@ -5,11 +5,11 @@ use crate::merge::MergedStream;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::eventfull::EventFull;
-use items::LogItem;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StreamItem;
+use items_0::streamitem::LogItem;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
+use items_2::eventfull::EventFull;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::ChannelConfig;
@@ -262,11 +262,15 @@ mod test {
use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_util::StreamExt;
- use items::{RangeCompletableItem, StreamItem};
+ use items_0::streamitem::RangeCompletableItem;
+ use items_0::streamitem::StreamItem;
use netpod::log::*;
- use netpod::timeunits::{DAY, MS};
+ use netpod::timeunits::DAY;
+ use netpod::timeunits::MS;
+ use netpod::ByteSize;
+ use netpod::ChannelConfig;
use netpod::DiskIoTune;
- use netpod::{ByteSize, ChannelConfig, Nanos};
+ use netpod::Nanos;
use streams::eventchunker::EventChunkerConf;
use streams::rangefilter::RangeFilter;
diff --git a/disk/src/merge.rs b/disk/src/merge.rs
index cf16f63..e8e83ca 100644
--- a/disk/src/merge.rs
+++ b/disk/src/merge.rs
@@ -5,13 +5,13 @@ use futures_util::Stream;
use futures_util::StreamExt;
use items::Appendable;
use items::ByteEstimate;
-use items::LogItem;
use items::PushableIndex;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StatsItem;
-use items::StreamItem;
use items::WithTimestamps;
+use items_0::streamitem::LogItem;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StatsItem;
+use items_0::streamitem::StreamItem;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
@@ -312,11 +312,20 @@ mod test {
use crate::merge::MergedStream;
use err::Error;
use futures_util::StreamExt;
- use items::{RangeCompletableItem, StreamItem};
+ use items_0::streamitem::RangeCompletableItem;
+ use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::test_data_base_path_databuffer;
- use netpod::timeunits::{DAY, MS};
- use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
+ use netpod::timeunits::DAY;
+ use netpod::timeunits::MS;
+ use netpod::ByteOrder;
+ use netpod::ByteSize;
+ use netpod::Channel;
+ use netpod::ChannelConfig;
+ use netpod::NanoRange;
+ use netpod::Nanos;
+ use netpod::ScalarType;
+ use netpod::Shape;
use std::path::PathBuf;
use streams::eventchunker::EventChunker;
use streams::eventchunker::EventChunkerConf;
diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs
index 56d46c1..bd22b64 100644
--- a/disk/src/merge/mergedblobsfromremotes.rs
+++ b/disk/src/merge/mergedblobsfromremotes.rs
@@ -3,8 +3,8 @@ use err::Error;
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::eventfull::EventFull;
-use items::Sitemty;
+use items_0::streamitem::Sitemty;
+use items_2::eventfull::EventFull;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::Cluster;
diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs
index b532649..df4d8a8 100644
--- a/disk/src/raw/conn.rs
+++ b/disk/src/raw/conn.rs
@@ -3,11 +3,11 @@ use err::Error;
use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::eventfull::EventFull;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StreamItem;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
+use items_2::eventfull::EventFull;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::AggKind;
diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs
index 5399715..af6f897 100644
--- a/disk/src/streamlog.rs
+++ b/disk/src/streamlog.rs
@@ -1,4 +1,4 @@
-use items::LogItem;
+use items_0::streamitem::LogItem;
use netpod::log::*;
use std::collections::VecDeque;
diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs
index 6d7c5a9..50f1721 100644
--- a/httpret/src/api1.rs
+++ b/httpret/src/api1.rs
@@ -1,30 +1,60 @@
use crate::err::Error;
-use crate::gather::{gather_get_json_generic, SubRes};
-use crate::{response, BodyStream, ReqCtx};
-use bytes::{BufMut, BytesMut};
-use futures_util::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
-use http::{Method, StatusCode};
-use hyper::{Body, Client, Request, Response};
-use items::eventfull::EventFull;
-use items::{RangeCompletableItem, Sitemty, StreamItem};
+use crate::gather::gather_get_json_generic;
+use crate::gather::SubRes;
+use crate::response;
+use crate::BodyStream;
+use crate::ReqCtx;
+use bytes::BufMut;
+use bytes::BytesMut;
+use futures_util::FutureExt;
+use futures_util::Stream;
+use futures_util::StreamExt;
+use futures_util::TryFutureExt;
+use futures_util::TryStreamExt;
+use http::Method;
+use http::StatusCode;
+use hyper::Body;
+use hyper::Client;
+use hyper::Request;
+use hyper::Response;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
+use items_2::eventfull::EventFull;
use itertools::Itertools;
+use netpod::log::*;
use netpod::query::api1::Api1Query;
use netpod::query::PlainEventsQuery;
use netpod::timeunits::SEC;
-use netpod::{log::*, ScalarType};
-use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape};
-use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig};
-use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
+use netpod::ByteSize;
+use netpod::Channel;
+use netpod::ChannelSearchQuery;
+use netpod::ChannelSearchResult;
+use netpod::DiskIoTune;
+use netpod::NanoRange;
+use netpod::NodeConfigCached;
+use netpod::PerfOpts;
+use netpod::ProxyConfig;
+use netpod::ScalarType;
+use netpod::Shape;
+use netpod::ACCEPT_ALL;
+use netpod::APP_JSON;
+use netpod::APP_OCTET;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
-use parse::channelconfig::{Config, ConfigEntry, MatchingConfigEntry};
-use serde::{Deserialize, Serialize};
+use parse::channelconfig::Config;
+use parse::channelconfig::ConfigEntry;
+use parse::channelconfig::MatchingConfigEntry;
+use serde::Deserialize;
+use serde::Serialize;
use serde_json::Value as JsonValue;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::time::{Duration, Instant};
+use std::task::Context;
+use std::task::Poll;
+use std::time::Duration;
+use std::time::Instant;
use streams::eventchunker::EventChunkerConf;
use tracing_futures::Instrument;
use url::Url;
diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs
index 9cf9f26..90944a1 100644
--- a/httpret/src/httpret.rs
+++ b/httpret/src/httpret.rs
@@ -541,6 +541,7 @@ async fn update_db_with_channel_names(
}
}
+#[allow(unused)]
async fn update_db_with_channel_names_3(
req: Request
,
_ctx: &ReqCtx,
@@ -611,9 +612,9 @@ pub struct StatusBoardEntry {
ts_created: SystemTime,
#[serde(serialize_with = "instant_serde::ser")]
ts_updated: SystemTime,
- #[serde(skip_serializing_if = "items::bool_is_false")]
+ #[serde(skip_serializing_if = "items_2::bool_is_false")]
is_error: bool,
- #[serde(skip_serializing_if = "items::bool_is_false")]
+ #[serde(skip_serializing_if = "items_2::bool_is_false")]
is_ok: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
errors: Vec,
diff --git a/items/Cargo.toml b/items/Cargo.toml
index e08262c..e471660 100644
--- a/items/Cargo.toml
+++ b/items/Cargo.toml
@@ -1,19 +1,15 @@
[package]
name = "items"
-version = "0.0.2"
+version = "0.1.0"
authors = ["Dominik Werder "]
edition = "2021"
-[lib]
-path = "src/items.rs"
-
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ciborium = "0.2"
-rmp-serde = "1.1.1"
bson = "2.4.0"
erased-serde = "0.3"
bytes = "1.2.1"
@@ -24,4 +20,3 @@ err = { path = "../err" }
items_proc = { path = "../items_proc" }
items_0 = { path = "../items_0" }
netpod = { path = "../netpod" }
-parse = { path = "../parse" }
diff --git a/items/src/items.rs b/items/src/items.rs
deleted file mode 100644
index 6e91b85..0000000
--- a/items/src/items.rs
+++ /dev/null
@@ -1,701 +0,0 @@
-pub mod eventfull;
-pub mod frame;
-pub mod inmem;
-pub mod streams;
-
-use crate::frame::make_frame_2;
-use bytes::BytesMut;
-use chrono::TimeZone;
-use chrono::Utc;
-use err::Error;
-use frame::make_error_frame;
-use frame::make_log_frame;
-use frame::make_range_complete_frame;
-use frame::make_stats_frame;
-use items_0::AsAnyRef;
-use netpod::log::Level;
-#[allow(unused)]
-use netpod::log::*;
-use netpod::DiskStats;
-use netpod::EventDataReadStats;
-use netpod::NanoRange;
-use netpod::RangeFilterStats;
-use netpod::Shape;
-use serde::de::DeserializeOwned;
-use serde::Deserialize;
-use serde::Serialize;
-use serde::Serializer;
-use std::any::Any;
-use std::fmt;
-use std::future::Future;
-use std::marker::PhantomData;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
-use tokio::fs::File;
-use tokio::io::AsyncRead;
-use tokio::io::ReadBuf;
-
-pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01;
-pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02;
-pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04;
-pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100;
-pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500;
-pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700;
-pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800;
-pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00;
-pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00;
-pub const LOG_FRAME_TYPE_ID: u32 = 0xc00;
-pub const STATS_FRAME_TYPE_ID: u32 = 0xd00;
-pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00;
-pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200;
-pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300;
-pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400;
-pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
-pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
-pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;
-pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00;
-
-pub fn bool_is_false(j: &bool) -> bool {
- *j == false
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
-pub enum RangeCompletableItem {
- RangeComplete,
- Data(T),
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
-pub enum StatsItem {
- EventDataReadStats(EventDataReadStats),
- RangeFilterStats(RangeFilterStats),
- DiskStats(DiskStats),
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
-pub enum StreamItem {
- DataItem(T),
- Log(LogItem),
- Stats(StatsItem),
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
-pub struct LogItem {
- pub node_ix: u32,
- #[serde(with = "levelserde")]
- pub level: Level,
- pub msg: String,
-}
-
-impl LogItem {
- pub fn quick(level: Level, msg: String) -> Self {
- Self {
- level,
- msg,
- node_ix: 42,
- }
- }
-}
-
-pub type Sitemty = Result>, Error>;
-
-#[macro_export]
-macro_rules! on_sitemty_range_complete {
- ($item:expr, $ex:expr) => {
- if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item {
- $ex
- }
- };
-}
-
-impl FrameType for Sitemty
-where
- T: FrameType,
-{
- fn frame_type_id(&self) -> u32 {
- match self {
- Ok(item) => match item {
- StreamItem::DataItem(item) => match item {
- RangeCompletableItem::RangeComplete => SITEMTY_NONSPEC_FRAME_TYPE_ID,
- RangeCompletableItem::Data(item) => item.frame_type_id(),
- },
- StreamItem::Log(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
- StreamItem::Stats(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
- },
- Err(_) => ERROR_FRAME_TYPE_ID,
- }
- }
-}
-
-pub fn sitem_data(x: X) -> Sitemty {
- Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
-}
-
-mod levelserde {
- use super::Level;
- use serde::de::{self, Visitor};
- use serde::{Deserializer, Serializer};
- use std::fmt;
-
- pub fn serialize(t: &Level, se: S) -> Result
- where
- S: Serializer,
- {
- let g = match *t {
- Level::ERROR => 1,
- Level::WARN => 2,
- Level::INFO => 3,
- Level::DEBUG => 4,
- Level::TRACE => 5,
- };
- se.serialize_u32(g)
- }
-
- struct VisitLevel;
-
- impl VisitLevel {
- fn from_u32(x: u32) -> Level {
- match x {
- 1 => Level::ERROR,
- 2 => Level::WARN,
- 3 => Level::INFO,
- 4 => Level::DEBUG,
- 5 => Level::TRACE,
- _ => Level::TRACE,
- }
- }
- }
-
- impl<'de> Visitor<'de> for VisitLevel {
- type Value = Level;
-
- fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- write!(fmt, "expect Level code")
- }
-
- fn visit_u64(self, val: u64) -> Result
- where
- E: de::Error,
- {
- Ok(VisitLevel::from_u32(val as _))
- }
- }
-
- pub fn deserialize<'de, D>(de: D) -> Result
- where
- D: Deserializer<'de>,
- {
- de.deserialize_u32(VisitLevel)
- }
-}
-
-pub const INMEM_FRAME_ENCID: u32 = 0x12121212;
-pub const INMEM_FRAME_HEAD: usize = 20;
-pub const INMEM_FRAME_FOOT: usize = 4;
-pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
-
-// Required for any inner type of Sitemty.
-pub trait FrameTypeInnerStatic {
- const FRAME_TYPE_ID: u32;
-}
-
-// To be implemented by the T of Sitemty, e.g. ScalarEvents.
-pub trait FrameTypeInnerDyn {
- // TODO check actual usage of this
- fn frame_type_id(&self) -> u32;
-}
-
-impl FrameTypeInnerDyn for T
-where
- T: FrameTypeInnerStatic,
-{
- fn frame_type_id(&self) -> u32 {
- ::FRAME_TYPE_ID
- }
-}
-
-pub trait FrameTypeStatic {
- const FRAME_TYPE_ID: u32;
-}
-
-impl FrameTypeStatic for Sitemty
-where
- T: FrameTypeInnerStatic,
-{
- const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID;
-}
-
-// Framable trait objects need some inspection to handle the supposed-to-be common Err ser format:
-// Meant to be implemented by Sitemty.
-pub trait FrameType {
- fn frame_type_id(&self) -> u32;
-}
-
-impl FrameType for Box
-where
- T: FrameType,
-{
- fn frame_type_id(&self) -> u32 {
- self.as_ref().frame_type_id()
- }
-}
-
-impl FrameTypeInnerDyn for Box {
- fn frame_type_id(&self) -> u32 {
- FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn())
- }
-}
-
-impl FrameTypeInnerDyn for Box {
- fn frame_type_id(&self) -> u32 {
- FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn())
- }
-}
-
-pub trait ContainsError {
- fn is_err(&self) -> bool;
- fn err(&self) -> Option<&::err::Error>;
-}
-
-impl ContainsError for Box
-where
- T: ContainsError,
-{
- fn is_err(&self) -> bool {
- self.as_ref().is_err()
- }
-
- fn err(&self) -> Option<&::err::Error> {
- self.as_ref().err()
- }
-}
-
-impl ContainsError for Sitemty {
- fn is_err(&self) -> bool {
- match self {
- Ok(_) => false,
- Err(_) => true,
- }
- }
-
- fn err(&self) -> Option<&::err::Error> {
- match self {
- Ok(_) => None,
- Err(e) => Some(e),
- }
- }
-}
-
-pub trait Framable {
- fn make_frame(&self) -> Result;
-}
-
-pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send {
- fn _dummy(&self);
-}
-
-impl FramableInner for T {
- fn _dummy(&self) {}
-}
-
-erased_serde::serialize_trait_object!(EventsDyn);
-erased_serde::serialize_trait_object!(TimeBinnableDyn);
-erased_serde::serialize_trait_object!(TimeBinned);
-
-impl Framable for Sitemty
-where
- T: Sized + serde::Serialize + FrameType,
-{
- fn make_frame(&self) -> Result {
- match self {
- Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => {
- let frame_type_id = k.frame_type_id();
- make_frame_2(self, frame_type_id)
- }
- Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(),
- Ok(StreamItem::Log(item)) => make_log_frame(item),
- Ok(StreamItem::Stats(item)) => make_stats_frame(item),
- Err(e) => make_error_frame(e),
- }
- }
-}
-
-impl Framable for Box
-where
- T: Framable + ?Sized,
-{
- fn make_frame(&self) -> Result {
- self.as_ref().make_frame()
- }
-}
-
-pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned {
- fn from_error(e: ::err::Error) -> Self;
- fn from_log(item: LogItem) -> Self;
- fn from_stats(item: StatsItem) -> Self;
- fn from_range_complete() -> Self;
-}
-
-impl FrameDecodable for Sitemty
-where
- T: FrameTypeInnerStatic + DeserializeOwned,
-{
- fn from_error(e: err::Error) -> Self {
- Err(e)
- }
-
- fn from_log(item: LogItem) -> Self {
- Ok(StreamItem::Log(item))
- }
-
- fn from_stats(item: StatsItem) -> Self {
- Ok(StreamItem::Stats(item))
- }
-
- fn from_range_complete() -> Self {
- Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
- }
-}
-
-#[derive(Serialize, Deserialize)]
-pub struct EventQueryJsonStringFrame(pub String);
-
-impl FrameTypeInnerStatic for EventQueryJsonStringFrame {
- const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME;
-}
-
-impl FrameType for EventQueryJsonStringFrame {
- fn frame_type_id(&self) -> u32 {
- EventQueryJsonStringFrame::FRAME_TYPE_ID
- }
-}
-
-#[derive(Clone, Debug, Deserialize)]
-pub struct IsoDateTime(chrono::DateTime);
-
-impl Serialize for IsoDateTime {
- fn serialize(&self, serializer: S) -> Result
- where
- S: Serializer,
- {
- serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
- }
-}
-
-pub fn make_iso_ts(tss: &[u64]) -> Vec {
- tss.iter()
- .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
- .collect()
-}
-
-pub enum Fits {
- Empty,
- Lower,
- Greater,
- Inside,
- PartlyLower,
- PartlyGreater,
- PartlyLowerAndGreater,
-}
-
-pub trait WithLen {
- fn len(&self) -> usize;
-}
-
-pub trait WithTimestamps {
- fn ts(&self, ix: usize) -> u64;
-}
-
-pub trait ByteEstimate {
- fn byte_estimate(&self) -> u64;
-}
-
-pub trait RangeOverlapInfo {
- // TODO do not take by value.
- fn ends_before(&self, range: NanoRange) -> bool;
- fn ends_after(&self, range: NanoRange) -> bool;
- fn starts_after(&self, range: NanoRange) -> bool;
-}
-
-pub trait FitsInside {
- fn fits_inside(&self, range: NanoRange) -> Fits;
-}
-
-pub trait FilterFittingInside: Sized {
- fn filter_fitting_inside(self, fit_range: NanoRange) -> Option;
-}
-
-pub trait PushableIndex {
- // TODO get rid of usage, involves copy.
- // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
- fn push_index(&mut self, src: &Self, ix: usize);
-}
-
-pub trait NewEmpty {
- fn empty(shape: Shape) -> Self;
-}
-
-pub trait Appendable: WithLen {
- fn empty_like_self(&self) -> Self;
-
- // TODO get rid of usage, involves copy.
- fn append(&mut self, src: &Self);
-
- // TODO the `ts2` makes no sense for non-bin-implementors
- fn append_zero(&mut self, ts1: u64, ts2: u64);
-}
-
-pub trait Clearable {
- fn clear(&mut self);
-}
-
-pub trait EventAppendable
-where
- Self: Sized,
-{
- type Value;
- fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self;
-}
-
-pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
- fn ts1s(&self) -> &Vec;
- fn ts2s(&self) -> &Vec;
-}
-
-pub trait TimeBinnableType:
- Send
- + Unpin
- + RangeOverlapInfo
- + FilterFittingInside
- + NewEmpty
- + Appendable
- + Serialize
- + DeserializeOwned
- + ReadableFromFile
- + FrameTypeInnerStatic
-{
- type Output: TimeBinnableType;
- type Aggregator: TimeBinnableTypeAggregator + Send + Unpin;
- fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator;
-}
-
-/// Provides a time-binned representation of the implementing type.
-/// In contrast to `TimeBinnableType` this is meant for trait objects.
-
-// TODO should not require Sync!
-// TODO SitemtyFrameType is already supertrait of FramableInner.
-pub trait TimeBinnableDyn:
- fmt::Debug
- + FramableInner
- + FrameType
- + FrameTypeInnerDyn
- + WithLen
- + RangeOverlapInfo
- + Any
- + AsAnyRef
- + Sync
- + Send
- + 'static
-{
- fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box;
-}
-
-pub trait TimeBinnableDynStub:
- fmt::Debug
- + FramableInner
- + FrameType
- + FrameTypeInnerDyn
- + WithLen
- + RangeOverlapInfo
- + Any
- + AsAnyRef
- + Sync
- + Send
- + 'static
-{
-}
-
-// impl for the stubs TODO: remove
-impl TimeBinnableDyn for T
-where
- T: TimeBinnableDynStub,
-{
- fn time_binner_new(&self, _edges: Vec, _do_time_weight: bool) -> Box {
- error!("TODO impl time_binner_new for T {}", std::any::type_name::());
- err::todoval()
- }
-}
-
-// TODO maybe this is no longer needed:
-pub trait TimeBinnableDynAggregator: Send {
- fn ingest(&mut self, item: &dyn TimeBinnableDyn);
- fn result(&mut self) -> Box;
-}
-
-/// Container of some form of events, for use as trait object.
-pub trait EventsDyn: TimeBinnableDyn {
- fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn;
- fn verify(&self);
- fn output_info(&self);
-}
-
-/// Data in time-binned form.
-pub trait TimeBinned: TimeBinnableDyn {
- fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn;
- fn edges_slice(&self) -> (&[u64], &[u64]);
- fn counts(&self) -> &[u64];
- fn mins(&self) -> Vec;
- fn maxs(&self) -> Vec;
- fn avgs(&self) -> Vec;
- fn validate(&self) -> Result<(), String>;
-}
-
-impl FrameType for Box {
- fn frame_type_id(&self) -> u32 {
- FrameType::frame_type_id(self.as_ref())
- }
-}
-
-impl WithLen for Box {
- fn len(&self) -> usize {
- self.as_time_binnable_dyn().len()
- }
-}
-
-impl RangeOverlapInfo for Box {
- fn ends_before(&self, range: NanoRange) -> bool {
- self.as_time_binnable_dyn().ends_before(range)
- }
-
- fn ends_after(&self, range: NanoRange) -> bool {
- self.as_time_binnable_dyn().ends_after(range)
- }
-
- fn starts_after(&self, range: NanoRange) -> bool {
- self.as_time_binnable_dyn().starts_after(range)
- }
-}
-
-impl TimeBinnableDyn for Box {
- fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Box {
- self.as_time_binnable_dyn().time_binner_new(edges, do_time_weight)
- }
-}
-
-// TODO should get I/O and tokio dependence out of this crate
-pub trait ReadableFromFile: Sized {
- fn read_from_file(file: File) -> Result, Error>;
- // TODO should not need this:
- fn from_buf(buf: &[u8]) -> Result;
-}
-
-// TODO should get I/O and tokio dependence out of this crate
-pub struct ReadPbv
-where
- T: ReadableFromFile,
-{
- buf: Vec,
- all: Vec,
- file: Option,
- _m1: PhantomData,
-}
-
-impl ReadPbv
-where
- T: ReadableFromFile,
-{
- pub fn new(file: File) -> Self {
- Self {
- // TODO make buffer size a parameter:
- buf: vec![0; 1024 * 32],
- all: vec![],
- file: Some(file),
- _m1: PhantomData,
- }
- }
-}
-
-impl Future for ReadPbv
-where
- T: ReadableFromFile + Unpin,
-{
- type Output = Result>, Error>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
- use Poll::*;
- let mut buf = std::mem::replace(&mut self.buf, Vec::new());
- let ret = 'outer: loop {
- let mut dst = ReadBuf::new(&mut buf);
- if dst.remaining() == 0 || dst.capacity() == 0 {
- break Ready(Err(Error::with_msg("bad read buffer")));
- }
- let fp = self.file.as_mut().unwrap();
- let f = Pin::new(fp);
- break match File::poll_read(f, cx, &mut dst) {
- Ready(res) => match res {
- Ok(_) => {
- if dst.filled().len() > 0 {
- self.all.extend_from_slice(dst.filled());
- continue 'outer;
- } else {
- match T::from_buf(&mut self.all) {
- Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
- Err(e) => Ready(Err(e)),
- }
- }
- }
- Err(e) => Ready(Err(e.into())),
- },
- Pending => Pending,
- };
- };
- self.buf = buf;
- ret
- }
-}
-
-pub trait TimeBinnableTypeAggregator: Send {
- type Input: TimeBinnableType;
- type Output: TimeBinnableType;
- fn range(&self) -> &NanoRange;
- fn ingest(&mut self, item: &Self::Input);
- // TODO this API is too convoluted for a minimal performance gain: should separate `result` and `reset`
- // or simply require to construct a new which is almost equally expensive.
- fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output;
-}
-
-pub trait TimestampInspectable: WithTimestamps + WithLen {}
-
-impl TimestampInspectable for T where T: WithTimestamps + WithLen {}
-
-pub fn inspect_timestamps(events: &dyn TimestampInspectable, range: NanoRange) -> String {
- use fmt::Write;
- let rd = range.delta();
- let mut buf = String::new();
- let n = events.len();
- for i in 0..n {
- if i < 3 || i > (n - 4) {
- let ts = events.ts(i);
- let z = ts - range.beg;
- let z = z as f64 / rd as f64 * 2.0 - 1.0;
- write!(&mut buf, "i {:3} tt {:6.3}\n", i, z).unwrap();
- }
- }
- buf
-}
-
-pub trait TimeBinnerDyn: Send {
- fn bins_ready_count(&self) -> usize;
- fn bins_ready(&mut self) -> Option>;
- fn ingest(&mut self, item: &dyn TimeBinnableDyn);
-
- /// If there is a bin in progress with non-zero count, push it to the result set.
- /// With push_empty == true, a bin in progress is pushed even if it contains no counts.
- fn push_in_progress(&mut self, push_empty: bool);
-
- /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
- /// to `push_in_progress` did not change the result count, as long as edges are left.
- /// The next call to `Self::bins_ready_count` must return one higher count than before.
- fn cycle(&mut self);
-}
diff --git a/items/src/lib.rs b/items/src/lib.rs
new file mode 100644
index 0000000..58a62f9
--- /dev/null
+++ b/items/src/lib.rs
@@ -0,0 +1,163 @@
+pub mod streams;
+
+use err::Error;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::StreamItem;
+#[allow(unused)]
+use netpod::log::*;
+use netpod::NanoRange;
+use netpod::Shape;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+use tokio::fs::File;
+use tokio::io::AsyncRead;
+use tokio::io::ReadBuf;
+
+pub enum Fits {
+ Empty,
+ Lower,
+ Greater,
+ Inside,
+ PartlyLower,
+ PartlyGreater,
+ PartlyLowerAndGreater,
+}
+
+pub trait WithLen {
+ fn len(&self) -> usize;
+}
+
+pub trait WithTimestamps {
+ fn ts(&self, ix: usize) -> u64;
+}
+
+pub trait ByteEstimate {
+ fn byte_estimate(&self) -> u64;
+}
+
+pub trait RangeOverlapInfo {
+ // TODO do not take by value.
+ fn ends_before(&self, range: NanoRange) -> bool;
+ fn ends_after(&self, range: NanoRange) -> bool;
+ fn starts_after(&self, range: NanoRange) -> bool;
+}
+
+pub trait FitsInside {
+ fn fits_inside(&self, range: NanoRange) -> Fits;
+}
+
+pub trait FilterFittingInside: Sized {
+ fn filter_fitting_inside(self, fit_range: NanoRange) -> Option;
+}
+
+pub trait PushableIndex {
+ // TODO get rid of usage, involves copy.
+ // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
+ fn push_index(&mut self, src: &Self, ix: usize);
+}
+
+pub trait NewEmpty {
+ fn empty(shape: Shape) -> Self;
+}
+
+pub trait Appendable: WithLen {
+ fn empty_like_self(&self) -> Self;
+
+ // TODO get rid of usage, involves copy.
+ fn append(&mut self, src: &Self);
+
+ // TODO the `ts2` makes no sense for non-bin-implementors
+ fn append_zero(&mut self, ts1: u64, ts2: u64);
+}
+
+pub trait Clearable {
+ fn clear(&mut self);
+}
+
+pub trait EventAppendable
+where
+ Self: Sized,
+{
+ type Value;
+ fn append_event(ret: Option, ts: u64, pulse: u64, value: Self::Value) -> Self;
+}
+
+pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
+ fn ts1s(&self) -> &Vec;
+ fn ts2s(&self) -> &Vec;
+}
+
+// TODO should get I/O and tokio dependence out of this crate
+pub trait ReadableFromFile: Sized {
+ fn read_from_file(file: File) -> Result, Error>;
+ // TODO should not need this:
+ fn from_buf(buf: &[u8]) -> Result;
+}
+
+// TODO should get I/O and tokio dependence out of this crate
+pub struct ReadPbv
+where
+ T: ReadableFromFile,
+{
+ buf: Vec,
+ all: Vec,
+ file: Option,
+ _m1: PhantomData,
+}
+
+impl ReadPbv
+where
+ T: ReadableFromFile,
+{
+ pub fn new(file: File) -> Self {
+ Self {
+ // TODO make buffer size a parameter:
+ buf: vec![0; 1024 * 32],
+ all: vec![],
+ file: Some(file),
+ _m1: PhantomData,
+ }
+ }
+}
+
+impl Future for ReadPbv
+where
+ T: ReadableFromFile + Unpin,
+{
+ type Output = Result>, Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
+ use Poll::*;
+ let mut buf = std::mem::replace(&mut self.buf, Vec::new());
+ let ret = 'outer: loop {
+ let mut dst = ReadBuf::new(&mut buf);
+ if dst.remaining() == 0 || dst.capacity() == 0 {
+ break Ready(Err(Error::with_msg("bad read buffer")));
+ }
+ let fp = self.file.as_mut().unwrap();
+ let f = Pin::new(fp);
+ break match File::poll_read(f, cx, &mut dst) {
+ Ready(res) => match res {
+ Ok(_) => {
+ if dst.filled().len() > 0 {
+ self.all.extend_from_slice(dst.filled());
+ continue 'outer;
+ } else {
+ match T::from_buf(&mut self.all) {
+ Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
+ Err(e) => Ready(Err(e)),
+ }
+ }
+ }
+ Err(e) => Ready(Err(e.into())),
+ },
+ Pending => Pending,
+ };
+ };
+ self.buf = buf;
+ ret
+ }
+}
diff --git a/items/src/streams.rs b/items/src/streams.rs
index 3494896..c18d002 100644
--- a/items/src/streams.rs
+++ b/items/src/streams.rs
@@ -1,7 +1,13 @@
-use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen};
+use crate::RangeCompletableItem;
+use crate::StreamItem;
+use crate::WithLen;
use err::Error;
-use futures_util::{Stream, StreamExt};
+use futures_util::Stream;
+use futures_util::StreamExt;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StatsItem;
use netpod::log::*;
+use netpod::DiskStats;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::fmt;
@@ -98,8 +104,6 @@ where
}
}
StreamItem::Stats(item) => {
- use crate::StatsItem;
- use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
diff --git a/items_0/Cargo.toml b/items_0/Cargo.toml
index 4e2c6f8..78cf410 100644
--- a/items_0/Cargo.toml
+++ b/items_0/Cargo.toml
@@ -12,5 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
erased-serde = "0.3"
serde_json = "1.0"
bincode = "1.3.3"
+bytes = "1.2.1"
+chrono = { version = "0.4.19", features = ["serde"] }
netpod = { path = "../netpod" }
err = { path = "../err" }
diff --git a/items_0/src/framable.rs b/items_0/src/framable.rs
new file mode 100644
index 0000000..5fd10cf
--- /dev/null
+++ b/items_0/src/framable.rs
@@ -0,0 +1,19 @@
+// Required for any inner type of Sitemty.
+pub trait FrameTypeInnerStatic {
+ const FRAME_TYPE_ID: u32;
+}
+
+// To be implemented by the T of Sitemty, e.g. ScalarEvents.
+pub trait FrameTypeInnerDyn {
+ // TODO check actual usage of this
+ fn frame_type_id(&self) -> u32;
+}
+
+impl FrameTypeInnerDyn for T
+where
+ T: FrameTypeInnerStatic,
+{
+ fn frame_type_id(&self) -> u32 {
+ ::FRAME_TYPE_ID
+ }
+}
diff --git a/items_0/src/isodate.rs b/items_0/src/isodate.rs
new file mode 100644
index 0000000..daeffd7
--- /dev/null
+++ b/items_0/src/isodate.rs
@@ -0,0 +1,24 @@
+use chrono::DateTime;
+use chrono::TimeZone;
+use chrono::Utc;
+use serde::Deserialize;
+use serde::Serialize;
+use serde::Serializer;
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct IsoDateTime(DateTime);
+
+impl Serialize for IsoDateTime {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
+ }
+}
+
+pub fn make_iso_ts(tss: &[u64]) -> Vec {
+ tss.iter()
+ .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
+ .collect()
+}
diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs
index e8a571d..985deb5 100644
--- a/items_0/src/items_0.rs
+++ b/items_0/src/items_0.rs
@@ -1,6 +1,9 @@
pub mod collect_c;
pub mod collect_s;
+pub mod framable;
+pub mod isodate;
pub mod scalar_ops;
+pub mod streamitem;
pub mod subfr;
pub mod bincode {
@@ -91,7 +94,7 @@ where
}
/// Data in time-binned form.
-pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable {
+pub trait TimeBinned: Any + TimeBinnable + crate::collect_c::Collectable + erased_serde::Serialize {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);
diff --git a/items_0/src/streamitem.rs b/items_0/src/streamitem.rs
new file mode 100644
index 0000000..ade3362
--- /dev/null
+++ b/items_0/src/streamitem.rs
@@ -0,0 +1,179 @@
+use crate::TimeBinned;
+use err::Error;
+use netpod::log::Level;
+use netpod::DiskStats;
+use netpod::EventDataReadStats;
+use netpod::RangeFilterStats;
+use serde::Deserialize;
+use serde::Serialize;
+
+pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01;
+pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02;
+pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04;
+pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100;
+pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500;
+pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700;
+pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800;
+pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00;
+pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00;
+pub const LOG_FRAME_TYPE_ID: u32 = 0xc00;
+pub const STATS_FRAME_TYPE_ID: u32 = 0xd00;
+pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00;
+pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200;
+pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300;
+pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400;
+pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
+pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
+pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;
+pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00;
+
+pub fn bool_is_false(j: &bool) -> bool {
+ *j == false
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub enum RangeCompletableItem {
+ RangeComplete,
+ Data(T),
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub enum StatsItem {
+ EventDataReadStats(EventDataReadStats),
+ RangeFilterStats(RangeFilterStats),
+ DiskStats(DiskStats),
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub enum StreamItem {
+ DataItem(T),
+ Log(LogItem),
+ Stats(StatsItem),
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct LogItem {
+ pub node_ix: u32,
+ #[serde(with = "levelserde")]
+ pub level: Level,
+ pub msg: String,
+}
+
+impl LogItem {
+ pub fn quick(level: Level, msg: String) -> Self {
+ Self {
+ level,
+ msg,
+ node_ix: 42,
+ }
+ }
+}
+
+pub type Sitemty = Result>, Error>;
+
+#[macro_export]
+macro_rules! on_sitemty_range_complete {
+ ($item:expr, $ex:expr) => {
+ if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item {
+ $ex
+ }
+ };
+}
+
+pub fn sitem_data(x: X) -> Sitemty {
+ Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
+}
+
+mod levelserde {
+ use super::Level;
+ use serde::de::{self, Visitor};
+ use serde::{Deserializer, Serializer};
+ use std::fmt;
+
+ pub fn serialize(t: &Level, se: S) -> Result
+ where
+ S: Serializer,
+ {
+ let g = match *t {
+ Level::ERROR => 1,
+ Level::WARN => 2,
+ Level::INFO => 3,
+ Level::DEBUG => 4,
+ Level::TRACE => 5,
+ };
+ se.serialize_u32(g)
+ }
+
+ struct VisitLevel;
+
+ impl VisitLevel {
+ fn from_u32(x: u32) -> Level {
+ match x {
+ 1 => Level::ERROR,
+ 2 => Level::WARN,
+ 3 => Level::INFO,
+ 4 => Level::DEBUG,
+ 5 => Level::TRACE,
+ _ => Level::TRACE,
+ }
+ }
+ }
+
+ impl<'de> Visitor<'de> for VisitLevel {
+ type Value = Level;
+
+ fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "expect Level code")
+ }
+
+ fn visit_u64(self, val: u64) -> Result
+ where
+ E: de::Error,
+ {
+ Ok(VisitLevel::from_u32(val as _))
+ }
+ }
+
+ pub fn deserialize<'de, D>(de: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ de.deserialize_u32(VisitLevel)
+ }
+}
+
+pub trait ContainsError {
+ fn is_err(&self) -> bool;
+ fn err(&self) -> Option<&::err::Error>;
+}
+
+impl ContainsError for Box
+where
+ T: ContainsError,
+{
+ fn is_err(&self) -> bool {
+ self.as_ref().is_err()
+ }
+
+ fn err(&self) -> Option<&::err::Error> {
+ self.as_ref().err()
+ }
+}
+
+impl ContainsError for Sitemty {
+ fn is_err(&self) -> bool {
+ match self {
+ Ok(_) => false,
+ Err(_) => true,
+ }
+ }
+
+ fn err(&self) -> Option<&::err::Error> {
+ match self {
+ Ok(_) => None,
+ Err(e) => Some(e),
+ }
+ }
+}
+
+erased_serde::serialize_trait_object!(TimeBinned);
diff --git a/items_2/Cargo.toml b/items_2/Cargo.toml
index da49755..bc4ca4a 100644
--- a/items_2/Cargo.toml
+++ b/items_2/Cargo.toml
@@ -20,9 +20,11 @@ crc32fast = "1.3.2"
futures-util = "0.3.24"
tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] }
humantime-serde = "1.1.1"
+rmp-serde = "1.1.1"
err = { path = "../err" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" }
taskrun = { path = "../taskrun" }
+parse = { path = "../parse" }
diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs
index fc77ad8..bf4b339 100644
--- a/items_2/src/channelevents.rs
+++ b/items_2/src/channelevents.rs
@@ -1,9 +1,10 @@
+use crate::framable::FrameType;
use crate::merger;
use crate::Events;
-use items::FrameType;
-use items::FrameTypeInnerStatic;
use items_0::collect_s::Collectable;
use items_0::collect_s::Collector;
+use items_0::framable::FrameTypeInnerStatic;
+use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use netpod::log::*;
@@ -88,7 +89,7 @@ pub enum ChannelEvents {
}
impl FrameTypeInnerStatic for ChannelEvents {
- const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
+ const FRAME_TYPE_ID: u32 = ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
}
impl FrameType for ChannelEvents {
diff --git a/items_2/src/databuffereventblobs.rs b/items_2/src/databuffereventblobs.rs
deleted file mode 100644
index c1eaeea..0000000
--- a/items_2/src/databuffereventblobs.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-use items::FrameType;
-use items::FrameTypeInnerStatic;
-use serde::Serialize;
-
-pub struct DatabufferEventBlob {}
-
-impl FrameTypeInnerStatic for DatabufferEventBlob {
- const FRAME_TYPE_ID: u32 = items::DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID;
-}
-
-impl FrameType for DatabufferEventBlob {
- fn frame_type_id(&self) -> u32 {
- ::FRAME_TYPE_ID
- }
-}
-
-impl Serialize for DatabufferEventBlob {
- fn serialize(&self, _serializer: S) -> Result
- where
- S: serde::Serializer,
- {
- todo!()
- }
-}
diff --git a/items/src/eventfull.rs b/items_2/src/eventfull.rs
similarity index 92%
rename from items/src/eventfull.rs
rename to items_2/src/eventfull.rs
index 6b62a16..7685343 100644
--- a/items/src/eventfull.rs
+++ b/items_2/src/eventfull.rs
@@ -1,12 +1,12 @@
-use crate::Appendable;
-use crate::ByteEstimate;
-use crate::Clearable;
-use crate::FrameType;
-use crate::FrameTypeInnerStatic;
-use crate::PushableIndex;
-use crate::WithLen;
-use crate::WithTimestamps;
+use crate::framable::FrameType;
use bytes::BytesMut;
+use items::ByteEstimate;
+use items::Clearable;
+use items::PushableIndex;
+use items::WithTimestamps;
+use items_0::framable::FrameTypeInnerStatic;
+use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID;
+use items_0::WithLen;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
@@ -123,7 +123,7 @@ impl EventFull {
}
impl FrameTypeInnerStatic for EventFull {
- const FRAME_TYPE_ID: u32 = crate::EVENT_FULL_FRAME_TYPE_ID;
+ const FRAME_TYPE_ID: u32 = EVENT_FULL_FRAME_TYPE_ID;
}
impl FrameType for EventFull {
@@ -138,7 +138,13 @@ impl WithLen for EventFull {
}
}
-impl Appendable for EventFull {
+impl items::WithLen for EventFull {
+ fn len(&self) -> usize {
+ self.tss.len()
+ }
+}
+
+impl items::Appendable for EventFull {
fn empty_like_self(&self) -> Self {
Self::empty()
}
diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs
index e2fba61..15b9acf 100644
--- a/items_2/src/eventsdim0.rs
+++ b/items_2/src/eventsdim0.rs
@@ -1155,16 +1155,15 @@ impl items_0::collect_c::Collectable for EventsDim0 {
#[cfg(test)]
mod test_frame {
+ use super::*;
use crate::channelevents::ChannelEvents;
- use crate::eventsdim0::EventsDim0;
- use err::Error;
- use items::Framable;
- use items::RangeCompletableItem;
- use items::Sitemty;
- use items::StreamItem;
- use items_0::AsAnyMut;
- use items_0::Empty;
- use items_0::Events;
+ use crate::framable::Framable;
+ use crate::framable::INMEM_FRAME_ENCID;
+ use crate::frame::decode_frame;
+ use crate::inmem::InMemoryFrame;
+ use items_0::streamitem::RangeCompletableItem;
+ use items_0::streamitem::Sitemty;
+ use items_0::streamitem::StreamItem;
#[test]
fn events_bincode() {
@@ -1180,13 +1179,13 @@ mod test_frame {
let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]);
eprintln!("[[{s}]]");
let buflen = buf.len();
- let frame = items::inmem::InMemoryFrame {
- encid: items::INMEM_FRAME_ENCID,
+ let frame = InMemoryFrame {
+ encid: INMEM_FRAME_ENCID,
tyid: 0x2500,
len: (buflen - 24) as _,
buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(),
};
- let item: Sitemty = items::frame::decode_frame(&frame).unwrap();
+ let item: Sitemty = decode_frame(&frame).unwrap();
let item = if let Ok(x) = item { x } else { panic!() };
let item = if let StreamItem::DataItem(x) = item {
x
@@ -1208,31 +1207,34 @@ mod test_frame {
} else {
panic!()
};
- eprintln!("NOW WE SEE: {:?}", item);
- // type_name_of_val alloc::boxed::Box
- eprintln!("0 {:22?}", item.as_any_mut().type_id());
- eprintln!("A {:22?}", std::any::TypeId::of::>());
- eprintln!("B {:22?}", std::any::TypeId::of::());
- eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>());
- eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>());
- eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box>());
- eprintln!("F {:22?}", std::any::TypeId::of::>>());
- eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0>());
- eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0>());
- eprintln!("I {:22?}", std::any::TypeId::of::>>>());
- //let item = item.as_mut();
- //eprintln!("1 {:22?}", item.type_id());
- /*
- let item = if let Some(item) =
- items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::>>()
- {
- item
- } else {
- panic!()
- };
- */
- eprintln!("Final value: {item:?}");
assert_eq!(item.tss(), &[123]);
+ #[cfg(DISABLED)]
+ {
+ eprintln!("NOW WE SEE: {:?}", item);
+ // type_name_of_val alloc::boxed::Box
+ eprintln!("0 {:22?}", item.as_any_mut().type_id());
+ eprintln!("A {:22?}", std::any::TypeId::of::>());
+ eprintln!("B {:22?}", std::any::TypeId::of::());
+ eprintln!("C {:22?}", std::any::TypeId::of::<&dyn items_0::Events>());
+ eprintln!("D {:22?}", std::any::TypeId::of::<&mut dyn items_0::Events>());
+ eprintln!("E {:22?}", std::any::TypeId::of::<&mut Box>());
+ eprintln!("F {:22?}", std::any::TypeId::of::>>());
+ eprintln!("G {:22?}", std::any::TypeId::of::<&EventsDim0>());
+ eprintln!("H {:22?}", std::any::TypeId::of::<&mut EventsDim0>());
+ eprintln!("I {:22?}", std::any::TypeId::of::>>>());
+ //let item = item.as_mut();
+ //eprintln!("1 {:22?}", item.type_id());
+ /*
+ let item = if let Some(item) =
+ items_0::collect_s::Collectable::as_any_mut(item).downcast_ref::>>()
+ {
+ item
+ } else {
+ panic!()
+ };
+ */
+ //eprintln!("Final value: {item:?}");
+ }
}
}
diff --git a/items_2/src/framable.rs b/items_2/src/framable.rs
new file mode 100644
index 0000000..4cdaf6e
--- /dev/null
+++ b/items_2/src/framable.rs
@@ -0,0 +1,150 @@
+use crate::frame::make_error_frame;
+use crate::frame::make_frame_2;
+use crate::frame::make_log_frame;
+use crate::frame::make_range_complete_frame;
+use crate::frame::make_stats_frame;
+use bytes::BytesMut;
+use err::Error;
+use items_0::framable::FrameTypeInnerDyn;
+use items_0::framable::FrameTypeInnerStatic;
+use items_0::streamitem::LogItem;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StatsItem;
+use items_0::streamitem::StreamItem;
+use items_0::streamitem::ERROR_FRAME_TYPE_ID;
+use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME;
+use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID;
+use serde::de::DeserializeOwned;
+use serde::Deserialize;
+use serde::Serialize;
+
+pub const INMEM_FRAME_ENCID: u32 = 0x12121212;
+pub const INMEM_FRAME_HEAD: usize = 20;
+pub const INMEM_FRAME_FOOT: usize = 4;
+pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
+
+pub trait FrameTypeStatic {
+ const FRAME_TYPE_ID: u32;
+}
+
+impl FrameTypeStatic for Sitemty
+where
+ T: FrameTypeInnerStatic,
+{
+ const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID;
+}
+
+// Framable trait objects need some inspection to handle the supposed-to-be common Err ser format:
+// Meant to be implemented by Sitemty.
+pub trait FrameType {
+ fn frame_type_id(&self) -> u32;
+}
+
+impl FrameType for Box
+where
+ T: FrameType,
+{
+ fn frame_type_id(&self) -> u32 {
+ self.as_ref().frame_type_id()
+ }
+}
+
+pub trait Framable {
+ fn make_frame(&self) -> Result;
+}
+
+pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send {
+ fn _dummy(&self);
+}
+
+impl FramableInner for T {
+ fn _dummy(&self) {}
+}
+
+impl Framable for Sitemty
+where
+ T: Sized + serde::Serialize + FrameType,
+{
+ fn make_frame(&self) -> Result {
+ match self {
+ Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => {
+ let frame_type_id = k.frame_type_id();
+ make_frame_2(self, frame_type_id)
+ }
+ Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(),
+ Ok(StreamItem::Log(item)) => make_log_frame(item),
+ Ok(StreamItem::Stats(item)) => make_stats_frame(item),
+ Err(e) => make_error_frame(e),
+ }
+ }
+}
+
+impl Framable for Box
+where
+ T: Framable + ?Sized,
+{
+ fn make_frame(&self) -> Result {
+ self.as_ref().make_frame()
+ }
+}
+
+pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned {
+ fn from_error(e: ::err::Error) -> Self;
+ fn from_log(item: LogItem) -> Self;
+ fn from_stats(item: StatsItem) -> Self;
+ fn from_range_complete() -> Self;
+}
+
+impl FrameDecodable for Sitemty
+where
+ T: FrameTypeInnerStatic + DeserializeOwned,
+{
+ fn from_error(e: err::Error) -> Self {
+ Err(e)
+ }
+
+ fn from_log(item: LogItem) -> Self {
+ Ok(StreamItem::Log(item))
+ }
+
+ fn from_stats(item: StatsItem) -> Self {
+ Ok(StreamItem::Stats(item))
+ }
+
+ fn from_range_complete() -> Self {
+ Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
+ }
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct EventQueryJsonStringFrame(pub String);
+
+impl FrameTypeInnerStatic for EventQueryJsonStringFrame {
+ const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME;
+}
+
+impl FrameType for EventQueryJsonStringFrame {
+ fn frame_type_id(&self) -> u32 {
+ EventQueryJsonStringFrame::FRAME_TYPE_ID
+ }
+}
+
+impl FrameType for Sitemty
+where
+ T: FrameType,
+{
+ fn frame_type_id(&self) -> u32 {
+ match self {
+ Ok(item) => match item {
+ StreamItem::DataItem(item) => match item {
+ RangeCompletableItem::RangeComplete => SITEMTY_NONSPEC_FRAME_TYPE_ID,
+ RangeCompletableItem::Data(item) => item.frame_type_id(),
+ },
+ StreamItem::Log(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
+ StreamItem::Stats(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
+ },
+ Err(_) => ERROR_FRAME_TYPE_ID,
+ }
+ }
+}
diff --git a/items/src/frame.rs b/items_2/src/frame.rs
similarity index 93%
rename from items/src/frame.rs
rename to items_2/src/frame.rs
index d80dffd..519cde5 100644
--- a/items/src/frame.rs
+++ b/items_2/src/frame.rs
@@ -1,13 +1,28 @@
+use crate::framable::FrameDecodable;
+use crate::framable::FrameType;
+use crate::framable::INMEM_FRAME_ENCID;
+use crate::framable::INMEM_FRAME_HEAD;
+use crate::framable::INMEM_FRAME_MAGIC;
use crate::inmem::InMemoryFrame;
-use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem};
-use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
-use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID};
-use bincode::config::{FixintEncoding, LittleEndian, RejectTrailing};
-use bincode::config::{WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing};
+use bincode::config::FixintEncoding;
+use bincode::config::LittleEndian;
+use bincode::config::RejectTrailing;
+use bincode::config::WithOtherEndian;
+use bincode::config::WithOtherIntEncoding;
+use bincode::config::WithOtherTrailing;
use bincode::DefaultOptions;
-use bytes::{BufMut, BytesMut};
+use bytes::BufMut;
+use bytes::BytesMut;
use err::Error;
use items_0::bincode;
+use items_0::streamitem::ContainsError;
+use items_0::streamitem::LogItem;
+use items_0::streamitem::StatsItem;
+use items_0::streamitem::ERROR_FRAME_TYPE_ID;
+use items_0::streamitem::LOG_FRAME_TYPE_ID;
+use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID;
+use items_0::streamitem::STATS_FRAME_TYPE_ID;
+use items_0::streamitem::TERM_FRAME_TYPE_ID;
#[allow(unused)]
use netpod::log::*;
use serde::Serialize;
diff --git a/items/src/inmem.rs b/items_2/src/inmem.rs
similarity index 100%
rename from items/src/inmem.rs
rename to items_2/src/inmem.rs
diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs
index d5b4be0..418c896 100644
--- a/items_2/src/items_2.rs
+++ b/items_2/src/items_2.rs
@@ -1,11 +1,14 @@
pub mod binsdim0;
pub mod binsxbindim0;
pub mod channelevents;
-pub mod databuffereventblobs;
+pub mod eventfull;
pub mod eventsdim0;
pub mod eventsdim1;
pub mod eventsxbindim0;
pub mod eventtransform;
+pub mod framable;
+pub mod frame;
+pub mod inmem;
pub mod merger;
pub mod streams;
#[cfg(test)]
@@ -20,11 +23,11 @@ use chrono::Utc;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StreamItem;
use items_0::collect_s::Collector;
use items_0::collect_s::ToJsonResult;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
use items_0::Empty;
use items_0::Events;
use items_0::RangeOverlapInfo;
diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs
index 8a6c7da..f283d6e 100644
--- a/items_2/src/merger.rs
+++ b/items_2/src/merger.rs
@@ -1,10 +1,10 @@
use crate::Error;
use futures_util::Stream;
use futures_util::StreamExt;
-use items::sitem_data;
-use items::RangeCompletableItem;
-use items::Sitemty;
-use items::StreamItem;
+use items_0::streamitem::sitem_data;
+use items_0::streamitem::RangeCompletableItem;
+use items_0::streamitem::Sitemty;
+use items_0::streamitem::StreamItem;
use netpod::log::*;
use std::collections::VecDeque;
use std::fmt;
@@ -359,7 +359,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll