This commit is contained in:
Dominik Werder
2023-02-28 09:47:27 +01:00
parent 9c1522f9bb
commit c54eaa6fcb
48 changed files with 909 additions and 940 deletions

View File

@@ -1,19 +1,15 @@
[package]
name = "items"
version = "0.0.2"
version = "0.1.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/items.rs"
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ciborium = "0.2"
rmp-serde = "1.1.1"
bson = "2.4.0"
erased-serde = "0.3"
bytes = "1.2.1"
@@ -24,4 +20,3 @@ err = { path = "../err" }
items_proc = { path = "../items_proc" }
items_0 = { path = "../items_0" }
netpod = { path = "../netpod" }
parse = { path = "../parse" }

View File

@@ -1,208 +0,0 @@
use crate::Appendable;
use crate::ByteEstimate;
use crate::Clearable;
use crate::FrameType;
use crate::FrameTypeInnerStatic;
use crate::PushableIndex;
use crate::WithLen;
use crate::WithTimestamps;
use bytes::BytesMut;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::collections::VecDeque;
#[derive(Debug, Serialize, Deserialize)]
pub struct EventFull {
pub tss: VecDeque<u64>,
pub pulses: VecDeque<u64>,
pub blobs: VecDeque<Option<Vec<u8>>>,
//#[serde(with = "decomps_serde")]
// TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp.
pub decomps: VecDeque<Option<Vec<u8>>>,
pub scalar_types: VecDeque<ScalarType>,
pub be: VecDeque<bool>,
pub shapes: VecDeque<Shape>,
pub comps: VecDeque<Option<CompressionMethod>>,
}
#[allow(unused)]
mod decomps_serde {
use super::*;
pub fn serialize<S>(t: &VecDeque<Option<BytesMut>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let a: Vec<_> = t
.iter()
.map(|k| match k {
None => None,
Some(j) => Some(j[..].to_vec()),
})
.collect();
Serialize::serialize(&a, s)
}
pub fn deserialize<'de, D>(d: D) -> Result<VecDeque<Option<BytesMut>>, D::Error>
where
D: Deserializer<'de>,
{
let a: Vec<Option<Vec<u8>>> = Deserialize::deserialize(d)?;
let a = a
.iter()
.map(|k| match k {
None => None,
Some(j) => {
let mut a = BytesMut::new();
a.extend_from_slice(&j);
Some(a)
}
})
.collect();
Ok(a)
}
}
impl EventFull {
pub fn empty() -> Self {
Self {
tss: VecDeque::new(),
pulses: VecDeque::new(),
blobs: VecDeque::new(),
decomps: VecDeque::new(),
scalar_types: VecDeque::new(),
be: VecDeque::new(),
shapes: VecDeque::new(),
comps: VecDeque::new(),
}
}
pub fn add_event(
&mut self,
ts: u64,
pulse: u64,
blob: Option<Vec<u8>>,
decomp: Option<Vec<u8>>,
scalar_type: ScalarType,
be: bool,
shape: Shape,
comp: Option<CompressionMethod>,
) {
self.tss.push_back(ts);
self.pulses.push_back(pulse);
self.blobs.push_back(blob);
self.decomps.push_back(decomp);
self.scalar_types.push_back(scalar_type);
self.be.push_back(be);
self.shapes.push_back(shape);
self.comps.push_back(comp);
}
pub fn truncate_ts(&mut self, end: u64) {
let mut nkeep = usize::MAX;
for (i, &ts) in self.tss.iter().enumerate() {
if ts >= end {
nkeep = i;
break;
}
}
self.tss.truncate(nkeep);
self.pulses.truncate(nkeep);
self.blobs.truncate(nkeep);
self.decomps.truncate(nkeep);
self.scalar_types.truncate(nkeep);
self.be.truncate(nkeep);
self.shapes.truncate(nkeep);
self.comps.truncate(nkeep);
}
}
impl FrameTypeInnerStatic for EventFull {
const FRAME_TYPE_ID: u32 = crate::EVENT_FULL_FRAME_TYPE_ID;
}
impl FrameType for EventFull {
fn frame_type_id(&self) -> u32 {
<Self as FrameTypeInnerStatic>::FRAME_TYPE_ID
}
}
impl WithLen for EventFull {
fn len(&self) -> usize {
self.tss.len()
}
}
impl Appendable for EventFull {
fn empty_like_self(&self) -> Self {
Self::empty()
}
// TODO expensive, get rid of it.
fn append(&mut self, src: &Self) {
self.tss.extend(&src.tss);
self.pulses.extend(&src.pulses);
self.blobs.extend(src.blobs.iter().map(Clone::clone));
self.decomps.extend(src.decomps.iter().map(Clone::clone));
self.scalar_types.extend(src.scalar_types.iter().map(Clone::clone));
self.be.extend(&src.be);
self.shapes.extend(src.shapes.iter().map(Clone::clone));
self.comps.extend(src.comps.iter().map(Clone::clone));
}
fn append_zero(&mut self, _ts1: u64, _ts2: u64) {
// TODO do we still need this type?
todo!()
}
}
impl Clearable for EventFull {
fn clear(&mut self) {
self.tss.clear();
self.pulses.clear();
self.blobs.clear();
self.decomps.clear();
self.scalar_types.clear();
self.be.clear();
self.shapes.clear();
self.comps.clear();
}
}
impl WithTimestamps for EventFull {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl ByteEstimate for EventFull {
fn byte_estimate(&self) -> u64 {
if self.len() == 0 {
0
} else {
// TODO that is clumsy... it assumes homogenous types.
// TODO improve via a const fn on NTY
let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len());
self.tss.len() as u64 * (40 + self.blobs[0].as_ref().map_or(0, |x| x.len()) as u64 + decomp_len as u64)
}
}
}
impl PushableIndex for EventFull {
// TODO check all use cases, can't we move?
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push_back(src.tss[ix]);
self.pulses.push_back(src.pulses[ix]);
self.blobs.push_back(src.blobs[ix].clone());
self.decomps.push_back(src.decomps[ix].clone());
self.scalar_types.push_back(src.scalar_types[ix].clone());
self.be.push_back(src.be[ix]);
self.shapes.push_back(src.shapes[ix].clone());
self.comps.push_back(src.comps[ix].clone());
}
}

View File

@@ -1,357 +0,0 @@
use crate::inmem::InMemoryFrame;
use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem};
use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID};
use bincode::config::{FixintEncoding, LittleEndian, RejectTrailing};
use bincode::config::{WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing};
use bincode::DefaultOptions;
use bytes::{BufMut, BytesMut};
use err::Error;
use items_0::bincode;
#[allow(unused)]
use netpod::log::*;
use serde::Serialize;
trait EC {
fn ec(self) -> err::Error;
}
impl EC for rmp_serde::encode::Error {
fn ec(self) -> err::Error {
err::Error::with_msg_no_trace(format!("{self:?}"))
}
}
impl EC for rmp_serde::decode::Error {
fn ec(self) -> err::Error {
err::Error::with_msg_no_trace(format!("{self:?}"))
}
}
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
where
FT: FrameType + ContainsError + Serialize,
{
if item.is_err() {
make_error_frame(item.err().unwrap())
} else {
make_frame_2(item, item.frame_type_id())
}
}
pub fn bincode_ser<W>(
w: W,
) -> bincode::Serializer<
W,
WithOtherTrailing<
WithOtherIntEncoding<WithOtherEndian<DefaultOptions, LittleEndian>, FixintEncoding>,
RejectTrailing,
>,
>
where
W: std::io::Write,
{
use bincode::Options;
let opts = DefaultOptions::new()
.with_little_endian()
.with_fixint_encoding()
.reject_trailing_bytes();
let ser = bincode::Serializer::new(w, opts);
ser
}
pub fn bincode_to_vec<S>(item: S) -> Result<Vec<u8>, Error>
where
S: Serialize,
{
let mut out = Vec::new();
let mut ser = bincode_ser(&mut out);
item.serialize(&mut ser).map_err(|e| format!("{e}"))?;
Ok(out)
}
pub fn bincode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
use bincode::Options;
let opts = DefaultOptions::new()
.with_little_endian()
.with_fixint_encoding()
.reject_trailing_bytes();
let mut de = bincode::Deserializer::from_slice(buf, opts);
<T as serde::Deserialize>::deserialize(&mut de).map_err(|e| format!("{e}").into())
}
pub fn encode_to_vec<S>(item: S) -> Result<Vec<u8>, Error>
where
S: Serialize,
{
if false {
serde_json::to_vec(&item).map_err(|e| e.into())
} else {
bincode_to_vec(&item)
}
}
pub fn decode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
if false {
serde_json::from_slice(buf).map_err(|e| e.into())
} else {
bincode_from_slice(buf)
}
}
pub fn make_frame_2<T>(item: &T, fty: u32) -> Result<BytesMut, Error>
where
T: erased_serde::Serialize,
{
let mut out = Vec::new();
//let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map();
//let writer = ciborium::ser::into_writer(&item, &mut out).unwrap();
let mut ser = bincode_ser(&mut out);
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
//let mut ser = serde_json::Serializer::new(&mut out);
//let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
match item.erased_serialize(&mut ser2) {
Ok(_) => {
let enc = out;
if enc.len() > u32::MAX as usize {
return Err(Error::with_msg(format!("too long payload {}", enc.len())));
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
// TODO reserve also for footer via constant
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(fty);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
//trace!("enc len {}", enc.len());
//trace!("payload_crc {}", payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
//trace!("frame_crc {}", frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
// TODO remove duplication for these similar `make_*_frame` functions:
pub fn make_error_frame(error: &::err::Error) -> Result<BytesMut, Error> {
match encode_to_vec(error) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(ERROR_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
//trace!("enc len {}", enc.len());
//trace!("payload_crc {}", payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
//trace!("frame_crc {}", frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
// TODO can I remove this usage?
pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
warn!("make_log_frame {item:?}");
match encode_to_vec(item) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(LOG_FRAME_TYPE_ID);
warn!("make_log_frame payload len {}", enc.len());
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
pub fn make_stats_frame(item: &StatsItem) -> Result<BytesMut, Error> {
match encode_to_vec(item) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(STATS_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
pub fn make_range_complete_frame() -> Result<BytesMut, Error> {
let enc = [];
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(RANGE_COMPLETE_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
pub fn make_term_frame() -> Result<BytesMut, Error> {
let enc = [];
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(TERM_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
pub fn decode_frame<T>(frame: &InMemoryFrame) -> Result<T, Error>
where
T: FrameDecodable,
{
if frame.encid() != INMEM_FRAME_ENCID {
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
}
if frame.len() as usize != frame.buf().len() {
return Err(Error::with_msg(format!(
"buf mismatch {} vs {} in {:?}",
frame.len(),
frame.buf().len(),
frame
)));
}
if frame.tyid() == ERROR_FRAME_TYPE_ID {
let k: ::err::Error = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len());
let n = frame.buf().len().min(128);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
Err(e)?
}
};
Ok(T::from_error(k))
} else if frame.tyid() == LOG_FRAME_TYPE_ID {
let k: LogItem = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len());
let n = frame.buf().len().min(128);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
Err(e)?
}
};
Ok(T::from_log(k))
} else if frame.tyid() == STATS_FRAME_TYPE_ID {
let k: StatsItem = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len());
let n = frame.buf().len().min(128);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
Err(e)?
}
};
Ok(T::from_stats(k))
} else if frame.tyid() == RANGE_COMPLETE_FRAME_TYPE_ID {
// There is currently no content in this variant.
Ok(T::from_range_complete())
} else {
let tyid = T::FRAME_TYPE_ID;
if frame.tyid() != tyid {
Err(Error::with_msg(format!(
"type id mismatch expect {:x} found {:x} {:?}",
tyid,
frame.tyid(),
frame
)))
} else {
match decode_from_slice(frame.buf()) {
Ok(item) => Ok(item),
Err(e) => {
error!("decode_frame T = {}", std::any::type_name::<T>());
error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid());
let n = frame.buf().len().min(64);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
Err(e)?
}
}
}
}
}
pub fn crchex<T>(t: T) -> String
where
T: AsRef<[u8]>,
{
let mut h = crc32fast::Hasher::new();
h.update(t.as_ref());
let crc = h.finalize();
format!("{:08x}", crc)
}

View File

@@ -1,34 +0,0 @@
use bytes::Bytes;
use std::fmt;
pub struct InMemoryFrame {
pub encid: u32,
pub tyid: u32,
pub len: u32,
pub buf: Bytes,
}
impl InMemoryFrame {
pub fn encid(&self) -> u32 {
self.encid
}
pub fn tyid(&self) -> u32 {
self.tyid
}
pub fn len(&self) -> u32 {
self.len
}
pub fn buf(&self) -> &Bytes {
&self.buf
}
}
impl fmt::Debug for InMemoryFrame {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"InMemoryFrame {{ encid: {:x} tyid: {:x} len {} }}",
self.encid, self.tyid, self.len
)
}
}

View File

@@ -1,701 +0,0 @@
pub mod eventfull;
pub mod frame;
pub mod inmem;
pub mod streams;
use crate::frame::make_frame_2;
use bytes::BytesMut;
use chrono::TimeZone;
use chrono::Utc;
use err::Error;
use frame::make_error_frame;
use frame::make_log_frame;
use frame::make_range_complete_frame;
use frame::make_stats_frame;
use items_0::AsAnyRef;
use netpod::log::Level;
#[allow(unused)]
use netpod::log::*;
use netpod::DiskStats;
use netpod::EventDataReadStats;
use netpod::NanoRange;
use netpod::RangeFilterStats;
use netpod::Shape;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use serde::Serializer;
use std::any::Any;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01;
pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02;
pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04;
pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100;
pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500;
pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700;
pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800;
pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00;
pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00;
pub const LOG_FRAME_TYPE_ID: u32 = 0xc00;
pub const STATS_FRAME_TYPE_ID: u32 = 0xd00;
pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00;
pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200;
pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300;
pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400;
pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;
pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00;
pub fn bool_is_false(j: &bool) -> bool {
*j == false
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum RangeCompletableItem<T> {
RangeComplete,
Data(T),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum StatsItem {
EventDataReadStats(EventDataReadStats),
RangeFilterStats(RangeFilterStats),
DiskStats(DiskStats),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum StreamItem<T> {
DataItem(T),
Log(LogItem),
Stats(StatsItem),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct LogItem {
pub node_ix: u32,
#[serde(with = "levelserde")]
pub level: Level,
pub msg: String,
}
impl LogItem {
pub fn quick(level: Level, msg: String) -> Self {
Self {
level,
msg,
node_ix: 42,
}
}
}
pub type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
#[macro_export]
macro_rules! on_sitemty_range_complete {
($item:expr, $ex:expr) => {
if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item {
$ex
}
};
}
impl<T> FrameType for Sitemty<T>
where
T: FrameType,
{
fn frame_type_id(&self) -> u32 {
match self {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => SITEMTY_NONSPEC_FRAME_TYPE_ID,
RangeCompletableItem::Data(item) => item.frame_type_id(),
},
StreamItem::Log(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
StreamItem::Stats(_) => SITEMTY_NONSPEC_FRAME_TYPE_ID,
},
Err(_) => ERROR_FRAME_TYPE_ID,
}
}
}
pub fn sitem_data<X>(x: X) -> Sitemty<X> {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
mod levelserde {
use super::Level;
use serde::de::{self, Visitor};
use serde::{Deserializer, Serializer};
use std::fmt;
pub fn serialize<S>(t: &Level, se: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let g = match *t {
Level::ERROR => 1,
Level::WARN => 2,
Level::INFO => 3,
Level::DEBUG => 4,
Level::TRACE => 5,
};
se.serialize_u32(g)
}
struct VisitLevel;
impl VisitLevel {
fn from_u32(x: u32) -> Level {
match x {
1 => Level::ERROR,
2 => Level::WARN,
3 => Level::INFO,
4 => Level::DEBUG,
5 => Level::TRACE,
_ => Level::TRACE,
}
}
}
impl<'de> Visitor<'de> for VisitLevel {
type Value = Level;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "expect Level code")
}
fn visit_u64<E>(self, val: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(VisitLevel::from_u32(val as _))
}
}
pub fn deserialize<'de, D>(de: D) -> Result<Level, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_u32(VisitLevel)
}
}
pub const INMEM_FRAME_ENCID: u32 = 0x12121212;
pub const INMEM_FRAME_HEAD: usize = 20;
pub const INMEM_FRAME_FOOT: usize = 4;
pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d;
// Required for any inner type of Sitemty.
pub trait FrameTypeInnerStatic {
const FRAME_TYPE_ID: u32;
}
// To be implemented by the T of Sitemty<T>, e.g. ScalarEvents.
pub trait FrameTypeInnerDyn {
// TODO check actual usage of this
fn frame_type_id(&self) -> u32;
}
impl<T> FrameTypeInnerDyn for T
where
T: FrameTypeInnerStatic,
{
fn frame_type_id(&self) -> u32 {
<Self as FrameTypeInnerStatic>::FRAME_TYPE_ID
}
}
pub trait FrameTypeStatic {
const FRAME_TYPE_ID: u32;
}
impl<T> FrameTypeStatic for Sitemty<T>
where
T: FrameTypeInnerStatic,
{
const FRAME_TYPE_ID: u32 = <T as FrameTypeInnerStatic>::FRAME_TYPE_ID;
}
// Framable trait objects need some inspection to handle the supposed-to-be common Err ser format:
// Meant to be implemented by Sitemty.
pub trait FrameType {
fn frame_type_id(&self) -> u32;
}
impl<T> FrameType for Box<T>
where
T: FrameType,
{
fn frame_type_id(&self) -> u32 {
self.as_ref().frame_type_id()
}
}
impl FrameTypeInnerDyn for Box<dyn TimeBinned> {
fn frame_type_id(&self) -> u32 {
FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn())
}
}
impl FrameTypeInnerDyn for Box<dyn EventsDyn> {
fn frame_type_id(&self) -> u32 {
FrameTypeInnerDyn::frame_type_id(self.as_time_binnable_dyn())
}
}
pub trait ContainsError {
fn is_err(&self) -> bool;
fn err(&self) -> Option<&::err::Error>;
}
impl<T> ContainsError for Box<T>
where
T: ContainsError,
{
fn is_err(&self) -> bool {
self.as_ref().is_err()
}
fn err(&self) -> Option<&::err::Error> {
self.as_ref().err()
}
}
impl<T> ContainsError for Sitemty<T> {
fn is_err(&self) -> bool {
match self {
Ok(_) => false,
Err(_) => true,
}
}
fn err(&self) -> Option<&::err::Error> {
match self {
Ok(_) => None,
Err(e) => Some(e),
}
}
}
pub trait Framable {
fn make_frame(&self) -> Result<BytesMut, Error>;
}
pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send {
fn _dummy(&self);
}
impl<T: erased_serde::Serialize + FrameTypeInnerDyn + Send> FramableInner for T {
fn _dummy(&self) {}
}
erased_serde::serialize_trait_object!(EventsDyn);
erased_serde::serialize_trait_object!(TimeBinnableDyn);
erased_serde::serialize_trait_object!(TimeBinned);
impl<T> Framable for Sitemty<T>
where
T: Sized + serde::Serialize + FrameType,
{
fn make_frame(&self) -> Result<BytesMut, Error> {
match self {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => {
let frame_type_id = k.frame_type_id();
make_frame_2(self, frame_type_id)
}
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(),
Ok(StreamItem::Log(item)) => make_log_frame(item),
Ok(StreamItem::Stats(item)) => make_stats_frame(item),
Err(e) => make_error_frame(e),
}
}
}
impl<T> Framable for Box<T>
where
T: Framable + ?Sized,
{
fn make_frame(&self) -> Result<BytesMut, Error> {
self.as_ref().make_frame()
}
}
pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned {
fn from_error(e: ::err::Error) -> Self;
fn from_log(item: LogItem) -> Self;
fn from_stats(item: StatsItem) -> Self;
fn from_range_complete() -> Self;
}
impl<T> FrameDecodable for Sitemty<T>
where
T: FrameTypeInnerStatic + DeserializeOwned,
{
fn from_error(e: err::Error) -> Self {
Err(e)
}
fn from_log(item: LogItem) -> Self {
Ok(StreamItem::Log(item))
}
fn from_stats(item: StatsItem) -> Self {
Ok(StreamItem::Stats(item))
}
fn from_range_complete() -> Self {
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
}
#[derive(Serialize, Deserialize)]
pub struct EventQueryJsonStringFrame(pub String);
impl FrameTypeInnerStatic for EventQueryJsonStringFrame {
const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME;
}
impl FrameType for EventQueryJsonStringFrame {
fn frame_type_id(&self) -> u32 {
EventQueryJsonStringFrame::FRAME_TYPE_ID
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct IsoDateTime(chrono::DateTime<Utc>);
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect()
}
pub enum Fits {
Empty,
Lower,
Greater,
Inside,
PartlyLower,
PartlyGreater,
PartlyLowerAndGreater,
}
pub trait WithLen {
fn len(&self) -> usize;
}
pub trait WithTimestamps {
fn ts(&self, ix: usize) -> u64;
}
pub trait ByteEstimate {
fn byte_estimate(&self) -> u64;
}
pub trait RangeOverlapInfo {
// TODO do not take by value.
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits;
}
pub trait FilterFittingInside: Sized {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self>;
}
pub trait PushableIndex {
// TODO get rid of usage, involves copy.
// TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
fn push_index(&mut self, src: &Self, ix: usize);
}
pub trait NewEmpty {
fn empty(shape: Shape) -> Self;
}
pub trait Appendable: WithLen {
fn empty_like_self(&self) -> Self;
// TODO get rid of usage, involves copy.
fn append(&mut self, src: &Self);
// TODO the `ts2` makes no sense for non-bin-implementors
fn append_zero(&mut self, ts1: u64, ts2: u64);
}
pub trait Clearable {
fn clear(&mut self);
}
pub trait EventAppendable
where
Self: Sized,
{
type Value;
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self;
}
pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
fn ts1s(&self) -> &Vec<u64>;
fn ts2s(&self) -> &Vec<u64>;
}
pub trait TimeBinnableType:
Send
+ Unpin
+ RangeOverlapInfo
+ FilterFittingInside
+ NewEmpty
+ Appendable
+ Serialize
+ DeserializeOwned
+ ReadableFromFile
+ FrameTypeInnerStatic
{
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator;
}
/// Provides a time-binned representation of the implementing type.
/// In contrast to `TimeBinnableType` this is meant for trait objects.
// TODO should not require Sync!
// TODO SitemtyFrameType is already supertrait of FramableInner.
pub trait TimeBinnableDyn:
fmt::Debug
+ FramableInner
+ FrameType
+ FrameTypeInnerDyn
+ WithLen
+ RangeOverlapInfo
+ Any
+ AsAnyRef
+ Sync
+ Send
+ 'static
{
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinnerDyn>;
}
pub trait TimeBinnableDynStub:
fmt::Debug
+ FramableInner
+ FrameType
+ FrameTypeInnerDyn
+ WithLen
+ RangeOverlapInfo
+ Any
+ AsAnyRef
+ Sync
+ Send
+ 'static
{
}
// impl for the stubs TODO: remove
impl<T> TimeBinnableDyn for T
where
T: TimeBinnableDynStub,
{
fn time_binner_new(&self, _edges: Vec<u64>, _do_time_weight: bool) -> Box<dyn TimeBinnerDyn> {
error!("TODO impl time_binner_new for T {}", std::any::type_name::<T>());
err::todoval()
}
}
// TODO maybe this is no longer needed:
pub trait TimeBinnableDynAggregator: Send {
fn ingest(&mut self, item: &dyn TimeBinnableDyn);
fn result(&mut self) -> Box<dyn TimeBinned>;
}
/// Container of some form of events, for use as trait object.
pub trait EventsDyn: TimeBinnableDyn {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn;
fn verify(&self);
fn output_info(&self);
}
/// Data in time-binned form.
pub trait TimeBinned: TimeBinnableDyn {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn;
fn edges_slice(&self) -> (&[u64], &[u64]);
fn counts(&self) -> &[u64];
fn mins(&self) -> Vec<f32>;
fn maxs(&self) -> Vec<f32>;
fn avgs(&self) -> Vec<f32>;
fn validate(&self) -> Result<(), String>;
}
impl FrameType for Box<dyn TimeBinned> {
fn frame_type_id(&self) -> u32 {
FrameType::frame_type_id(self.as_ref())
}
}
impl WithLen for Box<dyn TimeBinned> {
fn len(&self) -> usize {
self.as_time_binnable_dyn().len()
}
}
impl RangeOverlapInfo for Box<dyn TimeBinned> {
fn ends_before(&self, range: NanoRange) -> bool {
self.as_time_binnable_dyn().ends_before(range)
}
fn ends_after(&self, range: NanoRange) -> bool {
self.as_time_binnable_dyn().ends_after(range)
}
fn starts_after(&self, range: NanoRange) -> bool {
self.as_time_binnable_dyn().starts_after(range)
}
}
impl TimeBinnableDyn for Box<dyn TimeBinned> {
fn time_binner_new(&self, edges: Vec<u64>, do_time_weight: bool) -> Box<dyn TimeBinnerDyn> {
self.as_time_binnable_dyn().time_binner_new(edges, do_time_weight)
}
}
// TODO should get I/O and tokio dependence out of this crate
pub trait ReadableFromFile: Sized {
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error>;
// TODO should not need this:
fn from_buf(buf: &[u8]) -> Result<Self, Error>;
}
// TODO should get I/O and tokio dependence out of this crate
pub struct ReadPbv<T>
where
T: ReadableFromFile,
{
buf: Vec<u8>,
all: Vec<u8>,
file: Option<File>,
_m1: PhantomData<T>,
}
impl<T> ReadPbv<T>
where
T: ReadableFromFile,
{
pub fn new(file: File) -> Self {
Self {
// TODO make buffer size a parameter:
buf: vec![0; 1024 * 32],
all: vec![],
file: Some(file),
_m1: PhantomData,
}
}
}
impl<T> Future for ReadPbv<T>
where
T: ReadableFromFile + Unpin,
{
type Output = Result<StreamItem<RangeCompletableItem<T>>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let mut buf = std::mem::replace(&mut self.buf, Vec::new());
let ret = 'outer: loop {
let mut dst = ReadBuf::new(&mut buf);
if dst.remaining() == 0 || dst.capacity() == 0 {
break Ready(Err(Error::with_msg("bad read buffer")));
}
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
break match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.all.extend_from_slice(dst.filled());
continue 'outer;
} else {
match T::from_buf(&mut self.all) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
}
}
}
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
};
};
self.buf = buf;
ret
}
}
pub trait TimeBinnableTypeAggregator: Send {
type Input: TimeBinnableType;
type Output: TimeBinnableType;
fn range(&self) -> &NanoRange;
fn ingest(&mut self, item: &Self::Input);
// TODO this API is too convoluted for a minimal performance gain: should separate `result` and `reset`
// or simply require to construct a new which is almost equally expensive.
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output;
}
pub trait TimestampInspectable: WithTimestamps + WithLen {}
impl<T> TimestampInspectable for T where T: WithTimestamps + WithLen {}
pub fn inspect_timestamps(events: &dyn TimestampInspectable, range: NanoRange) -> String {
use fmt::Write;
let rd = range.delta();
let mut buf = String::new();
let n = events.len();
for i in 0..n {
if i < 3 || i > (n - 4) {
let ts = events.ts(i);
let z = ts - range.beg;
let z = z as f64 / rd as f64 * 2.0 - 1.0;
write!(&mut buf, "i {:3} tt {:6.3}\n", i, z).unwrap();
}
}
buf
}
pub trait TimeBinnerDyn: Send {
fn bins_ready_count(&self) -> usize;
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>>;
fn ingest(&mut self, item: &dyn TimeBinnableDyn);
/// If there is a bin in progress with non-zero count, push it to the result set.
/// With push_empty == true, a bin in progress is pushed even if it contains no counts.
fn push_in_progress(&mut self, push_empty: bool);
/// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call
/// to `push_in_progress` did not change the result count, as long as edges are left.
/// The next call to `Self::bins_ready_count` must return one higher count than before.
fn cycle(&mut self);
}

163
items/src/lib.rs Normal file
View File

@@ -0,0 +1,163 @@
pub mod streams;
use err::Error;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
#[allow(unused)]
use netpod::log::*;
use netpod::NanoRange;
use netpod::Shape;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
pub enum Fits {
Empty,
Lower,
Greater,
Inside,
PartlyLower,
PartlyGreater,
PartlyLowerAndGreater,
}
pub trait WithLen {
fn len(&self) -> usize;
}
pub trait WithTimestamps {
fn ts(&self, ix: usize) -> u64;
}
pub trait ByteEstimate {
fn byte_estimate(&self) -> u64;
}
pub trait RangeOverlapInfo {
// TODO do not take by value.
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits;
}
pub trait FilterFittingInside: Sized {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self>;
}
pub trait PushableIndex {
// TODO get rid of usage, involves copy.
// TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
fn push_index(&mut self, src: &Self, ix: usize);
}
pub trait NewEmpty {
fn empty(shape: Shape) -> Self;
}
pub trait Appendable: WithLen {
fn empty_like_self(&self) -> Self;
// TODO get rid of usage, involves copy.
fn append(&mut self, src: &Self);
// TODO the `ts2` makes no sense for non-bin-implementors
fn append_zero(&mut self, ts1: u64, ts2: u64);
}
pub trait Clearable {
fn clear(&mut self);
}
pub trait EventAppendable
where
Self: Sized,
{
type Value;
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self;
}
pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
fn ts1s(&self) -> &Vec<u64>;
fn ts2s(&self) -> &Vec<u64>;
}
// TODO should get I/O and tokio dependence out of this crate
pub trait ReadableFromFile: Sized {
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error>;
// TODO should not need this:
fn from_buf(buf: &[u8]) -> Result<Self, Error>;
}
// TODO should get I/O and tokio dependence out of this crate
pub struct ReadPbv<T>
where
T: ReadableFromFile,
{
buf: Vec<u8>,
all: Vec<u8>,
file: Option<File>,
_m1: PhantomData<T>,
}
impl<T> ReadPbv<T>
where
T: ReadableFromFile,
{
pub fn new(file: File) -> Self {
Self {
// TODO make buffer size a parameter:
buf: vec![0; 1024 * 32],
all: vec![],
file: Some(file),
_m1: PhantomData,
}
}
}
impl<T> Future for ReadPbv<T>
where
T: ReadableFromFile + Unpin,
{
type Output = Result<StreamItem<RangeCompletableItem<T>>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let mut buf = std::mem::replace(&mut self.buf, Vec::new());
let ret = 'outer: loop {
let mut dst = ReadBuf::new(&mut buf);
if dst.remaining() == 0 || dst.capacity() == 0 {
break Ready(Err(Error::with_msg("bad read buffer")));
}
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
break match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.all.extend_from_slice(dst.filled());
continue 'outer;
} else {
match T::from_buf(&mut self.all) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
}
}
}
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
};
};
self.buf = buf;
ret
}
}

View File

@@ -1,7 +1,13 @@
use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen};
use crate::RangeCompletableItem;
use crate::StreamItem;
use crate::WithLen;
use err::Error;
use futures_util::{Stream, StreamExt};
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use netpod::log::*;
use netpod::DiskStats;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::fmt;
@@ -98,8 +104,6 @@ where
}
}
StreamItem::Stats(item) => {
use crate::StatsItem;
use netpod::DiskStats;
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}