Remove crate items

This commit is contained in:
Dominik Werder
2023-03-22 09:38:19 +01:00
parent c0bdc854ff
commit d1c10e1712
46 changed files with 598 additions and 557 deletions

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["daqbuffer", "httpret", "h5out", "items", "items_2", "items_proc", "nodenet", "httpclient", "dq"]
members = ["daqbuffer", "httpret", "h5out", "items_proc", "nodenet", "httpclient", "dq"]
[profile.release]
opt-level = 1

View File

@@ -31,7 +31,6 @@ httpclient = { path = "../httpclient" }
disk = { path = "../disk" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
items = { path = "../items" }
streams = { path = "../streams" }
[dev-dependencies]

View File

@@ -28,4 +28,3 @@ err = { path = "../err" }
netpod = { path = "../netpod" }
parse = { path = "../parse" }
taskrun = { path = "../taskrun" }
items = { path = "../items" }

View File

@@ -40,7 +40,6 @@ query = { path = "../query" }
bitshuffle = { path = "../bitshuffle" }
dbconn = { path = "../dbconn" }
parse = { path = "../parse" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
streams = { path = "../streams" }

View File

@@ -5,6 +5,7 @@ use netpod::ChannelConfig;
use netpod::NodeConfigCached;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ChannelConfigs;
use parse::channelconfig::MatchingConfigEntry;
pub async fn config(
@@ -12,14 +13,24 @@ pub async fn config(
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ChannelConfig, Error> {
let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(&range, &channel_config) {
let channel_configs = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(&range, &channel_configs) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::None => {
return Err(Error::with_public_msg(format!(
"disk::channelconfig no config entry found for {:?}",
channel
)))?
}
MatchingConfigEntry::Multiple => {
return Err(Error::with_public_msg(format!(
"disk::channelconfig multiple config entries in range found for {:?}",
channel
)))?
}
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() {
@@ -30,7 +41,7 @@ pub async fn config(
channel: channel.clone(),
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),
shape: shape,
shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
@@ -38,3 +49,7 @@ pub async fn config(
};
Ok(channel_config)
}
pub async fn configs(channel: Channel, node_config: &NodeConfigCached) -> Result<ChannelConfigs, Error> {
read_local_config(channel.clone(), node_config.node.clone()).await
}

View File

@@ -8,6 +8,7 @@ use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use items_2::merger::Merger;
use netpod::log::*;
@@ -93,9 +94,7 @@ impl Stream for EventChunkerMultifile {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
//tracing::field::DebugValue;
let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix);
//span1.record("node_ix", &self.node_ix);
let _spg = span1.enter();
use Poll::*;
'outer: loop {
@@ -117,7 +116,7 @@ impl Stream for EventChunkerMultifile {
Ready(Some(Ok(k))) => {
let k = if let StreamItem::DataItem(RangeCompletableItem::Data(h)) = k {
let mut h: EventFull = h;
if h.tss.len() > 0 {
if h.len() > 0 {
let min = h.tss.iter().fold(u64::MAX, |a, &x| a.min(x));
let max = h.tss.iter().fold(u64::MIN, |a, &x| a.max(x));
if min <= self.max_ts {
@@ -131,7 +130,7 @@ impl Stream for EventChunkerMultifile {
"EventChunkerMultifile emit {}/{} events {}",
self.emit_count,
after,
h.tss.len()
h.len()
);
self.emit_count += 1;
}
@@ -224,9 +223,6 @@ impl Stream for EventChunkerMultifile {
self.expand,
self.do_decompress,
);
let chunker = chunker
//.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))
;
chunkers.push(Box::pin(chunker) as _);
}
}
@@ -270,6 +266,7 @@ mod test {
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::DAY;
@@ -328,7 +325,7 @@ mod test {
RangeCompletableItem::Data(item) => {
// TODO assert more
debug!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::<Vec<_>>());
event_count += item.tss.len();
event_count += item.len();
for ts in item.tss {
tss.push(ts);
}

View File

@@ -64,11 +64,13 @@ pub async fn make_event_pipe(
}
}
let range = evq.range().clone();
let channel_config = match read_local_config(evq.channel().clone(), node_config.node.clone()).await {
Ok(k) => k,
let channel_config =
crate::channelconfig::config(evq.range().try_into()?, evq.channel().clone(), node_config).await;
let channel_config = match channel_config {
Ok(x) => x,
Err(e) => {
// TODO introduce detailed error type
if e.msg().contains("ErrorKind::NotFound") {
warn!("{e}");
let s = futures_util::stream::empty();
return Ok(Box::pin(s));
} else {
@@ -76,29 +78,6 @@ pub async fn make_event_pipe(
}
}
};
let entry_res = match extract_matching_config_entry(&(&range).try_into()?, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
channel: evq.channel().clone(),
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),
shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
trace!(
"make_event_pipe need_expand {need_expand} {evq:?}",
need_expand = evq.one_before_range()
@@ -122,14 +101,10 @@ pub async fn make_event_pipe(
true,
out_max_len,
);
let shape = entry.to_shape()?;
let scalar_type = channel_config.scalar_type.clone();
let shape = channel_config.shape.clone();
error!("TODO replace AggKind in the called code");
let pipe = make_num_pipeline_stream_evs(
entry.scalar_type.clone(),
shape.clone(),
AggKind::TimeWeightedScalar,
event_blobs,
);
let pipe = make_num_pipeline_stream_evs(scalar_type, shape.clone(), AggKind::TimeWeightedScalar, event_blobs);
Ok(pipe)
}
@@ -138,14 +113,24 @@ pub async fn get_applicable_entry(
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ConfigEntry, Error> {
let channel_config = read_local_config(channel, node_config.node.clone()).await?;
let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::None => {
return Err(Error::with_public_msg(format!(
"get_applicable_entry no config entry found {:?}",
channel
)))?
}
MatchingConfigEntry::Multiple => {
return Err(Error::with_public_msg(format!(
"get_applicable_entry multiple config entries found for {:?}",
channel
)))?
}
MatchingConfigEntry::Entry(entry) => entry,
};
Ok(entry.clone())
@@ -259,7 +244,21 @@ pub async fn make_event_blobs_pipe(
}
let expand = evq.one_before_range();
let range = evq.range();
let entry = get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await?;
let entry = match get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await {
Ok(x) => x,
Err(e) => {
if e.to_public_error().msg().contains("no config entry found") {
let item = items_0::streamitem::LogItem {
node_ix: node_config.ix as _,
level: Level::WARN,
msg: format!("{} {}", node_config.node.host, e),
};
return Ok(Box::pin(stream::iter([Ok(StreamItem::Log(item))])));
} else {
return Err(e);
}
}
};
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
type ItemType = Sitemty<EventFull>;
// TODO should depend on host config

View File

@@ -16,7 +16,6 @@ bytes = "1.0.1"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
items = { path = "../items" }
parse = { path = "../parse" }
disk = { path = "../disk" }
streams = { path = "../streams" }

View File

@@ -112,7 +112,7 @@ impl Error {
reason: self.reason(),
msg: self
.public_msg()
.map(|k| k.join("\n"))
.map(|k| k.join("; "))
.unwrap_or("No error message".into()),
}
}
@@ -167,8 +167,8 @@ impl fmt::Debug for Error {
};
write!(fmt, "msg: {}", self.msg)?;
if let Some(msgs) = self.public_msg() {
for msg in msgs {
write!(fmt, "\npublic: {}", msg)?;
for (i, msg) in msgs.iter().enumerate() {
write!(fmt, "; pub({i}): {msg}")?;
}
}
if !trace_str.is_empty() {

View File

@@ -29,7 +29,6 @@ netpod = { path = "../netpod" }
query = { path = "../query" }
dbconn = { path = "../dbconn" }
disk = { path = "../disk" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }

View File

@@ -8,6 +8,7 @@ use bytes::BufMut;
use bytes::BytesMut;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use disk::raw::conn::make_local_event_blobs_stream;
use futures_util::stream;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -22,6 +23,7 @@ use hyper::Response;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use itertools::Itertools;
use netpod::log::*;
@@ -43,7 +45,7 @@ use netpod::APP_JSON;
use netpod::APP_OCTET;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::Config;
use parse::channelconfig::ChannelConfigs;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::MatchingConfigEntry;
use query::api4::events::PlainEventsQuery;
@@ -641,7 +643,7 @@ pub struct DataApiPython3DataStream {
node_config: NodeConfigCached,
chan_ix: usize,
chan_stream: Option<Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>>,
config_fut: Option<Pin<Box<dyn Future<Output = Result<Config, Error>> + Send>>>,
config_fut: Option<Pin<Box<dyn Future<Output = Result<ChannelConfigs, Error>> + Send>>>,
disk_io_tune: DiskIoTune,
do_decompress: bool,
#[allow(unused)]
@@ -689,7 +691,7 @@ impl DataApiPython3DataStream {
count_events: &mut usize,
) -> Result<BytesMut, Error> {
let mut d = BytesMut::new();
for i1 in 0..b.tss.len() {
for i1 in 0..b.len() {
const EVIMAX: usize = 6;
if *count_events < EVIMAX {
debug!(
@@ -811,85 +813,94 @@ impl Stream for DataApiPython3DataStream {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
match entry_res {
MatchingConfigEntry::None => {
return Err(Error::with_public_msg("no config entry found"))?
warn!("DataApiPython3DataStream no config entry found for {:?}", config);
self.chan_stream = Some(Box::pin(stream::empty()));
continue;
}
MatchingConfigEntry::Multiple => {
return Err(Error::with_public_msg("multiple config entries found"))?
warn!(
"DataApiPython3DataStream multiple config entries found for {:?}",
config
);
self.chan_stream = Some(Box::pin(stream::empty()));
continue;
}
MatchingConfigEntry::Entry(entry) => entry.clone(),
};
let channel = self.channels[self.chan_ix - 1].clone();
debug!("found channel_config for {}: {:?}", channel.name, entry);
let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs();
info!("query for event blobs retrieval: evq {evq:?}");
warn!("fix magic inmem_bufcap");
let perf_opts = PerfOpts::default();
// TODO is this a good to place decide this?
let s = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s = make_local_event_blobs_stream(
evq.range().try_into()?,
evq.channel().clone(),
&entry,
evq.one_before_range(),
self.do_decompress,
event_chunker_conf,
self.disk_io_tune.clone(),
&self.node_config,
)?;
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
} else {
if let Some(sh) = &entry.shape {
if sh.len() > 1 {
warn!("Remote stream fetch for shape {sh:?}");
}
}
debug!("Set up merged remote stream");
let s = MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
let channel = self.channels[self.chan_ix - 1].clone();
move |b| {
let ret = match b {
Ok(b) => {
let f = match b {
StreamItem::DataItem(RangeCompletableItem::Data(b)) => {
Self::convert_item(
b,
&channel,
&entry,
&mut header_out,
&mut count_events,
)?
}
_ => BytesMut::new(),
};
Ok(f)
MatchingConfigEntry::Entry(entry) => {
let entry = entry.clone();
let channel = self.channels[self.chan_ix - 1].clone();
debug!("found channel_config for {}: {:?}", channel.name, entry);
let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs();
info!("query for event blobs retrieval: evq {evq:?}");
warn!("fix magic inmem_bufcap");
let perf_opts = PerfOpts::default();
// TODO is this a good to place decide this?
let s = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s = make_local_event_blobs_stream(
evq.range().try_into()?,
evq.channel().clone(),
&entry,
evq.one_before_range(),
self.do_decompress,
event_chunker_conf,
self.disk_io_tune.clone(),
&self.node_config,
)?;
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
} else {
if let Some(sh) = &entry.shape {
if sh.len() > 1 {
warn!("Remote stream fetch for shape {sh:?}");
}
}
Err(e) => Err(e),
debug!("Set up merged remote stream");
let s = MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
ret
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
let channel = self.channels[self.chan_ix - 1].clone();
move |b| {
let ret = match b {
Ok(b) => {
let f = match b {
StreamItem::DataItem(RangeCompletableItem::Data(b)) => {
Self::convert_item(
b,
&channel,
&entry,
&mut header_out,
&mut count_events,
)?
}
_ => BytesMut::new(),
};
Ok(f)
}
Err(e) => Err(e),
};
ret
}
});
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
continue;
}
});
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
continue;
}
}
Ready(Err(e)) => {
self.config_fut = None;

View File

@@ -1,12 +1,17 @@
use crate::bodystream::{response, ToPublicResponse};
use crate::bodystream::response;
use crate::bodystream::ToPublicResponse;
use crate::Error;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use http::{Method, Request, Response};
use hyper::Body;
use netpod::log::*;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::{ChannelSearchQuery, NodeConfigCached};
use netpod::{ACCEPT_ALL, APP_JSON};
use netpod::NodeConfigCached;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
use url::Url;
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {

View File

@@ -127,6 +127,65 @@ impl ChannelConfigHandler {
}
}
pub struct ChannelConfigsHandler {}
impl ChannelConfigsHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/channel/configs" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match self.channel_configs(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
Ok(e.to_public_response())
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn channel_configs(
&self,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
info!("channel_configs");
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelConfigQuery::from_url(&url)?;
info!("channel_configs for q {q:?}");
let conf = if let Some(_) = &node_config.node_config.cluster.scylla {
return Err(Error::with_msg_no_trace("TODO"));
} else if let Some(_) = &node_config.node.channel_archiver {
return Err(Error::with_msg_no_trace("TODO"));
} else if let Some(_) = &node_config.node.archiver_appliance {
return Err(Error::with_msg_no_trace("TODO"));
} else {
disk::channelconfig::configs(q.channel, node_config).await?
};
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&conf)?))?;
Ok(ret)
}
}
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}

View File

@@ -1,24 +1,25 @@
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::fmt;
#[derive(Serialize, Deserialize)]
pub struct Error(pub ::err::Error);
pub struct Error(pub err::Error);
impl Error {
pub fn with_msg<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_msg(s))
Self(err::Error::with_msg(s))
}
pub fn with_msg_no_trace<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_msg_no_trace(s))
Self(err::Error::with_msg_no_trace(s))
}
pub fn with_public_msg<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_public_msg(s))
Self(err::Error::with_public_msg(s))
}
pub fn with_public_msg_no_trace<S: Into<String>>(s: S) -> Self {
Self(::err::Error::with_public_msg_no_trace(s))
Self(err::Error::with_public_msg_no_trace(s))
}
pub fn msg(&self) -> &str {
@@ -52,13 +53,13 @@ impl fmt::Display for Error {
impl std::error::Error for Error {}
impl From<::err::Error> for Error {
fn from(x: ::err::Error) -> Self {
impl From<err::Error> for Error {
fn from(x: err::Error) -> Self {
Self(x)
}
}
impl From<Error> for ::err::Error {
impl From<Error> for err::Error {
fn from(x: Error) -> Self {
x.0
}

View File

@@ -30,6 +30,7 @@ use hyper::Body;
use hyper::Request;
use hyper::Response;
use net::SocketAddr;
use netpod::is_false;
use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
use netpod::NodeConfigCached;
@@ -297,6 +298,8 @@ async fn http_service_inner(
h.handle(req, &node_config).await
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) {
@@ -607,9 +610,9 @@ pub struct StatusBoardEntry {
ts_created: SystemTime,
#[serde(serialize_with = "instant_serde::ser")]
ts_updated: SystemTime,
#[serde(skip_serializing_if = "items_2::bool_is_false")]
#[serde(skip_serializing_if = "is_false")]
is_error: bool,
#[serde(skip_serializing_if = "items_2::bool_is_false")]
#[serde(skip_serializing_if = "is_false")]
is_ok: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
errors: Vec<Error>,

View File

@@ -1,22 +0,0 @@
[package]
name = "items"
version = "0.1.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ciborium = "0.2"
bson = "2.4.0"
erased-serde = "0.3"
bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.22", features = ["serde"] }
crc32fast = "1.3.2"
err = { path = "../err" }
items_proc = { path = "../items_proc" }
items_0 = { path = "../items_0" }
netpod = { path = "../netpod" }

View File

@@ -1,107 +0,0 @@
use err::Error;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use netpod::range::evrange::NanoRange;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
pub trait WithTimestamps {
fn ts(&self, ix: usize) -> u64;
}
pub trait ByteEstimate {
fn byte_estimate(&self) -> u64;
}
pub trait RangeOverlapInfo {
// TODO do not take by value.
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
pub trait EventAppendable
where
Self: Sized,
{
type Value;
fn append_event(ret: Option<Self>, ts: u64, pulse: u64, value: Self::Value) -> Self;
}
// TODO should get I/O and tokio dependence out of this crate
trait ReadableFromFile: Sized {
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error>;
// TODO should not need this:
fn from_buf(buf: &[u8]) -> Result<Self, Error>;
}
// TODO should get I/O and tokio dependence out of this crate
struct ReadPbv<T>
where
T: ReadableFromFile,
{
buf: Vec<u8>,
all: Vec<u8>,
file: Option<File>,
_m1: PhantomData<T>,
}
impl<T> ReadPbv<T>
where
T: ReadableFromFile,
{
fn new(file: File) -> Self {
Self {
// TODO make buffer size a parameter:
buf: vec![0; 1024 * 32],
all: Vec::new(),
file: Some(file),
_m1: PhantomData,
}
}
}
impl<T> Future for ReadPbv<T>
where
T: ReadableFromFile + Unpin,
{
type Output = Result<StreamItem<RangeCompletableItem<T>>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let mut buf = std::mem::replace(&mut self.buf, Vec::new());
let ret = 'outer: loop {
let mut dst = ReadBuf::new(&mut buf);
if dst.remaining() == 0 || dst.capacity() == 0 {
break Ready(Err(Error::with_msg("bad read buffer")));
}
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
break match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.all.extend_from_slice(dst.filled());
continue 'outer;
} else {
match T::from_buf(&mut self.all) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
}
}
}
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
};
};
self.buf = buf;
ret
}
}

11
items_0/src/container.rs Normal file
View File

@@ -0,0 +1,11 @@
use crate::Events;
pub trait ByteEstimate {
fn byte_estimate(&self) -> u64;
}
impl ByteEstimate for Box<dyn Events> {
fn byte_estimate(&self) -> u64 {
self.as_ref().byte_estimate()
}
}

View File

@@ -1,10 +1,10 @@
pub mod collect_s;
pub mod container;
pub mod framable;
pub mod isodate;
pub mod scalar_ops;
pub mod streamitem;
pub mod subfr;
pub mod transform;
pub mod bincode {
pub use bincode::*;
@@ -12,6 +12,7 @@ pub mod bincode {
use collect_s::Collectable;
use collect_s::ToJsonResult;
use container::ByteEstimate;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use std::any::Any;
@@ -135,7 +136,16 @@ impl From<MergeError> for err::Error {
/// Container of some form of events, for use as trait object.
pub trait Events:
fmt::Debug + TypeName + Any + Collectable + TimeBinnable + WithLen + Send + erased_serde::Serialize + EventsNonObj
fmt::Debug
+ TypeName
+ Any
+ Collectable
+ TimeBinnable
+ WithLen
+ ByteEstimate
+ Send
+ erased_serde::Serialize
+ EventsNonObj
{
fn as_time_binnable(&self) -> &dyn TimeBinnable;
fn verify(&self) -> bool;
@@ -183,22 +193,22 @@ pub struct TransformProperties {
pub needs_value: bool,
}
pub trait Transformer {
pub trait EventTransform {
fn query_transform_properties(&self) -> TransformProperties;
}
impl<T> Transformer for Box<T>
impl<T> EventTransform for Box<T>
where
T: Transformer,
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()
}
}
impl<T> Transformer for std::pin::Pin<Box<T>>
impl<T> EventTransform for std::pin::Pin<Box<T>>
where
T: Transformer,
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()

View File

@@ -60,7 +60,7 @@ impl AsPrimF32 for String {
}
pub trait ScalarOps:
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
{
fn zero_b() -> Self;
fn equal_slack(&self, rhs: &Self) -> bool;

View File

@@ -142,38 +142,4 @@ mod levelserde {
}
}
pub trait ContainsError {
fn is_err(&self) -> bool;
fn err(&self) -> Option<&::err::Error>;
}
impl<T> ContainsError for Box<T>
where
T: ContainsError,
{
fn is_err(&self) -> bool {
self.as_ref().is_err()
}
fn err(&self) -> Option<&::err::Error> {
self.as_ref().err()
}
}
impl<T> ContainsError for Sitemty<T> {
fn is_err(&self) -> bool {
match self {
Ok(_) => false,
Err(_) => true,
}
}
fn err(&self) -> Option<&::err::Error> {
match self {
Ok(_) => None,
Err(e) => Some(e),
}
}
}
erased_serde::serialize_trait_object!(TimeBinned);

View File

@@ -22,7 +22,6 @@ tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] }
humantime-serde = "1.1.1"
rmp-serde = "1.1.1"
err = { path = "../err" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" }

View File

@@ -11,9 +11,9 @@ use items_0::collect_s::ToJsonResult;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::EventTransform;
use items_0::TimeBinnable;
use items_0::TimeBinner;
use items_0::Transformer;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
@@ -69,7 +69,7 @@ fn _old_binned_collected(
scalar_type: ScalarType,
shape: Shape,
binrange: BinnedRangeEnum,
transformer: &dyn Transformer,
transformer: &dyn EventTransform,
deadline: Instant,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
) -> Result<BinnedCollectedResult, Error> {

View File

@@ -18,10 +18,12 @@ use items_0::TimeBinned;
use items_0::TimeBinner;
use items_0::TimeBins;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::CmpZero;
use netpod::Dim0Kind;
use serde::Deserialize;
use serde::Serialize;
@@ -303,11 +305,11 @@ pub struct BinsDim0CollectedResult<NTY> {
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "crate::is_zero_u32")]
#[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")]
missing_bins: u32,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,

View File

@@ -21,13 +21,14 @@ use items_0::TimeBinned;
use items_0::TimeBinner;
use items_0::TimeBins;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use num_traits::Zero;
use netpod::CmpZero;
use serde::Deserialize;
use serde::Serialize;
use std::any;
@@ -269,11 +270,11 @@ pub struct BinsXbinDim0CollectedResult<NTY> {
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
#[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")]
missing_bins: u32,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,

View File

@@ -1,10 +1,10 @@
use crate::framable::FrameType;
use crate::merger;
use crate::merger::Mergeable;
use crate::Events;
use items_0::collect_s::Collectable;
use items_0::collect_s::Collected;
use items_0::collect_s::Collector;
use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
use items_0::AsAnyMut;
@@ -12,9 +12,7 @@ use items_0::AsAnyRef;
use items_0::MergeError;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use serde::Deserialize;
use serde::Serialize;
@@ -55,6 +53,13 @@ impl ConnStatusEvent {
}
}
impl ByteEstimate for ConnStatusEvent {
fn byte_estimate(&self) -> u64 {
// TODO magic number, but maybe good enough
32
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ChannelStatus {
Connect,
@@ -86,6 +91,13 @@ impl ChannelStatusEvent {
}
}
impl ByteEstimate for ChannelStatusEvent {
fn byte_estimate(&self) -> u64 {
// TODO magic number, but maybe good enough
32
}
}
/// Events on a channel consist not only of e.g. timestamped values, but can be also
/// connection status changes.
#[derive(Debug)]
@@ -490,6 +502,18 @@ impl WithLen for ChannelEvents {
}
}
impl ByteEstimate for ChannelEvents {
fn byte_estimate(&self) -> u64 {
match self {
ChannelEvents::Events(k) => k.byte_estimate(),
ChannelEvents::Status(k) => match k {
Some(k) => k.byte_estimate(),
None => 0,
},
}
}
}
impl Mergeable for ChannelEvents {
fn ts_min(&self) -> Option<u64> {
match self {

View File

@@ -1,8 +1,7 @@
use crate::framable::FrameType;
use crate::merger::Mergeable;
use bytes::BytesMut;
use items::ByteEstimate;
use items::WithTimestamps;
use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID;
use items_0::Empty;
@@ -29,6 +28,7 @@ pub struct EventFull {
pub be: VecDeque<bool>,
pub shapes: VecDeque<Shape>,
pub comps: VecDeque<Option<CompressionMethod>>,
pub entry_payload_max: u64,
}
#[allow(unused)]
@@ -81,6 +81,9 @@ impl EventFull {
shape: Shape,
comp: Option<CompressionMethod>,
) {
let m1 = blob.as_ref().map_or(0, |x| x.len());
let m2 = decomp.as_ref().map_or(0, |x| x.len());
self.entry_payload_max = self.entry_payload_max.max(m1 as u64 + m2 as u64);
self.tss.push_back(ts);
self.pulses.push_back(pulse);
self.blobs.push_back(blob);
@@ -91,6 +94,7 @@ impl EventFull {
self.comps.push_back(comp);
}
// TODO possible to get rid of this?
pub fn truncate_ts(&mut self, end: u64) {
let mut nkeep = usize::MAX;
for (i, &ts) in self.tss.iter().enumerate() {
@@ -131,6 +135,7 @@ impl Empty for EventFull {
be: VecDeque::new(),
shapes: VecDeque::new(),
comps: VecDeque::new(),
entry_payload_max: 0,
}
}
}
@@ -141,22 +146,9 @@ impl WithLen for EventFull {
}
}
impl WithTimestamps for EventFull {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl ByteEstimate for EventFull {
fn byte_estimate(&self) -> u64 {
if self.len() == 0 {
0
} else {
// TODO that is clumsy... it assumes homogenous types.
// TODO improve via a const fn on NTY
let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len());
self.tss.len() as u64 * (40 + self.blobs[0].as_ref().map_or(0, |x| x.len()) as u64 + decomp_len as u64)
}
self.len() as u64 * (64 + self.entry_payload_max)
}
}
@@ -176,6 +168,13 @@ impl Mergeable for EventFull {
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
// TODO make it harder to forget new members when the struct may get modified in the future
let r = range.0..range.1;
let mut max = dst.entry_payload_max;
for i in r.clone() {
let m1 = self.blobs[i].as_ref().map_or(0, |x| x.len());
let m2 = self.decomps[i].as_ref().map_or(0, |x| x.len());
max = max.max(m1 as u64 + m2 as u64);
}
dst.entry_payload_max = max;
dst.tss.extend(self.tss.drain(r.clone()));
dst.pulses.extend(self.pulses.drain(r.clone()));
dst.blobs.extend(self.blobs.drain(r.clone()));

View File

@@ -10,6 +10,7 @@ use items_0::collect_s::Collector;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonBytes;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::scalar_ops::ScalarOps;
use items_0::Appendable;
use items_0::AsAnyMut;
@@ -21,6 +22,7 @@ use items_0::MergeError;
use items_0::TimeBinnable;
use items_0::TimeBinner;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
@@ -125,6 +127,13 @@ impl<NTY> WithLen for EventsDim0<NTY> {
}
}
impl<STY> ByteEstimate for EventsDim0<STY> {
fn byte_estimate(&self) -> u64 {
let stylen = mem::size_of::<STY>();
(self.len() * (8 + 8 + stylen)) as u64
}
}
impl<NTY: ScalarOps> RangeOverlapInfo for EventsDim0<NTY> {
fn ends_before(&self, range: &SeriesRange) -> bool {
if range.is_time() {
@@ -238,9 +247,9 @@ pub struct EventsDim0CollectorOutput<NTY> {
pulse_off: VecDeque<u64>,
#[serde(rename = "values")]
values: VecDeque<NTY>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
@@ -1122,7 +1131,7 @@ mod test_frame {
use items_0::streamitem::StreamItem;
#[test]
fn events_bincode() {
fn events_serialize() {
taskrun::tracing_init().unwrap();
let mut events = EventsDim0::empty();
events.push(123, 234, 55f32);

View File

@@ -10,6 +10,7 @@ use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonBytes;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::scalar_ops::ScalarOps;
use items_0::Appendable;
use items_0::AsAnyMut;
@@ -21,6 +22,7 @@ use items_0::MergeError;
use items_0::TimeBinnable;
use items_0::TimeBinner;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::SEC;
@@ -31,6 +33,7 @@ use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use std::mem;
#[allow(unused)]
macro_rules! trace2 {
@@ -130,6 +133,14 @@ impl<NTY> WithLen for EventsDim1<NTY> {
}
}
impl<STY> ByteEstimate for EventsDim1<STY> {
fn byte_estimate(&self) -> u64 {
let stylen = mem::size_of::<STY>();
let n = self.values.front().map_or(0, Vec::len);
(self.len() * (8 + 8 + n * stylen)) as u64
}
}
impl<NTY: ScalarOps> RangeOverlapInfo for EventsDim1<NTY> {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
@@ -199,9 +210,9 @@ pub struct EventsDim1CollectorOutput<NTY> {
pulse_off: VecDeque<u64>,
#[serde(rename = "values")]
values: VecDeque<Vec<NTY>>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,

View File

@@ -1,4 +1,5 @@
use crate::binsxbindim0::BinsXbinDim0;
use items_0::container::ByteEstimate;
use crate::IsoDateTime;
use crate::RangeOverlapInfo;
use crate::TimeBinnableType;
@@ -14,6 +15,7 @@ use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
@@ -69,7 +71,7 @@ where
}
}
impl<NTY> items::ByteEstimate for EventsXbinDim0<NTY> {
impl<NTY> ByteEstimate for EventsXbinDim0<NTY> {
fn byte_estimate(&self) -> u64 {
todo!("byte_estimate")
}
@@ -365,9 +367,9 @@ pub struct EventsXbinDim0CollectorOutput<NTY> {
maxs: VecDeque<NTY>,
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,

View File

@@ -90,7 +90,7 @@ where
}
pub trait FrameDecodable: FrameTypeStatic + DeserializeOwned {
fn from_error(e: ::err::Error) -> Self;
fn from_error(e: err::Error) -> Self;
fn from_log(item: LogItem) -> Self;
fn from_stats(item: StatsItem) -> Self;
fn from_range_complete() -> Self;
@@ -148,3 +148,34 @@ where
}
}
}
#[test]
fn test_frame_log() {
use crate::channelevents::ChannelEvents;
use crate::frame::decode_from_slice;
use netpod::log::Level;
let item = LogItem {
node_ix: 123,
level: Level::TRACE,
msg: format!("test-log-message"),
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::Log(item));
let buf = Framable::make_frame(&item).unwrap();
let len = u32::from_le_bytes(buf[12..16].try_into().unwrap());
let item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap();
}
#[test]
fn test_frame_error() {
use crate::channelevents::ChannelEvents;
use crate::frame::decode_from_slice;
let item: Sitemty<ChannelEvents> = Err(Error::with_msg_no_trace(format!("dummy-error-message")));
let buf = Framable::make_frame(&item).unwrap();
let len = u32::from_le_bytes(buf[12..16].try_into().unwrap());
let tyid = u32::from_le_bytes(buf[8..12].try_into().unwrap());
if tyid != ERROR_FRAME_TYPE_ID {
panic!("bad tyid");
}
eprintln!("buf len {} len {}", buf.len(), len);
let item2: Error = decode_from_slice(&buf[20..20 + len as usize]).unwrap();
}

View File

@@ -1,5 +1,4 @@
use crate::framable::FrameDecodable;
use crate::framable::FrameType;
use crate::framable::INMEM_FRAME_ENCID;
use crate::framable::INMEM_FRAME_HEAD;
use crate::framable::INMEM_FRAME_MAGIC;
@@ -15,7 +14,6 @@ use bytes::BufMut;
use bytes::BytesMut;
use err::Error;
use items_0::bincode;
use items_0::streamitem::ContainsError;
use items_0::streamitem::LogItem;
use items_0::streamitem::StatsItem;
use items_0::streamitem::ERROR_FRAME_TYPE_ID;
@@ -23,9 +21,10 @@ use items_0::streamitem::LOG_FRAME_TYPE_ID;
use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID;
use items_0::streamitem::STATS_FRAME_TYPE_ID;
use items_0::streamitem::TERM_FRAME_TYPE_ID;
#[allow(unused)]
use netpod::log::*;
use serde::Serialize;
use std::any;
use std::io;
trait EC {
fn ec(self) -> err::Error;
@@ -43,17 +42,6 @@ impl EC for rmp_serde::decode::Error {
}
}
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
where
FT: FrameType + ContainsError + Serialize,
{
if item.is_err() {
make_error_frame(item.err().unwrap())
} else {
make_frame_2(item, item.frame_type_id())
}
}
pub fn bincode_ser<W>(
w: W,
) -> bincode::Serializer<
@@ -64,7 +52,7 @@ pub fn bincode_ser<W>(
>,
>
where
W: std::io::Write,
W: io::Write,
{
use bincode::Options;
let opts = DefaultOptions::new()
@@ -98,73 +86,83 @@ where
<T as serde::Deserialize>::deserialize(&mut de).map_err(|e| format!("{e}").into())
}
pub fn encode_to_vec<S>(item: S) -> Result<Vec<u8>, Error>
pub fn msgpack_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
S: Serialize,
T: Serialize,
{
if false {
serde_json::to_vec(&item).map_err(|e| e.into())
} else {
bincode_to_vec(&item)
}
rmp_serde::to_vec_named(&item).map_err(|e| format!("{e}").into())
}
pub fn msgpack_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize,
{
let mut out = Vec::new();
let mut ser1 = rmp_serde::Serializer::new(&mut out).with_struct_map();
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser1);
item.erased_serialize(&mut ser2)
.map_err(|e| Error::from(format!("{e}")))?;
Ok(out)
}
pub fn msgpack_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
rmp_serde::from_slice(buf).map_err(|e| format!("{e}").into())
}
pub fn encode_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
{
msgpack_to_vec(item)
}
pub fn encode_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize,
{
msgpack_erased_to_vec(item)
}
pub fn decode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
if false {
serde_json::from_slice(buf).map_err(|e| e.into())
} else {
bincode_from_slice(buf)
}
msgpack_from_slice(buf)
}
pub fn make_frame_2<T>(item: &T, fty: u32) -> Result<BytesMut, Error>
pub fn make_frame_2<T>(item: T, fty: u32) -> Result<BytesMut, Error>
where
T: erased_serde::Serialize,
{
let mut out = Vec::new();
//let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map();
//let writer = ciborium::ser::into_writer(&item, &mut out).unwrap();
let mut ser = bincode_ser(&mut out);
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
//let mut ser = serde_json::Serializer::new(&mut out);
//let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser);
match item.erased_serialize(&mut ser2) {
Ok(_) => {
let enc = out;
if enc.len() > u32::MAX as usize {
return Err(Error::with_msg(format!("too long payload {}", enc.len())));
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
// TODO reserve also for footer via constant
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(fty);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
//trace!("enc len {}", enc.len());
//trace!("payload_crc {}", payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
//trace!("frame_crc {}", frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
let enc = encode_erased_to_vec(item)?;
if enc.len() > u32::MAX as usize {
return Err(Error::with_msg(format!("too long payload {}", enc.len())));
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
// TODO reserve also for footer via constant
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(fty);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
return Ok(buf);
}
// TODO remove duplication for these similar `make_*_frame` functions:
pub fn make_error_frame(error: &::err::Error) -> Result<BytesMut, Error> {
pub fn make_error_frame(error: &err::Error) -> Result<BytesMut, Error> {
match encode_to_vec(error) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
@@ -176,24 +174,18 @@ pub fn make_error_frame(error: &::err::Error) -> Result<BytesMut, Error> {
buf.put_u32_le(ERROR_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
//trace!("enc len {}", enc.len());
//trace!("payload_crc {}", payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
//trace!("frame_crc {}", frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
// TODO can I remove this usage?
pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
warn!("make_log_frame {item:?}");
match encode_to_vec(item) {
Ok(enc) => {
let mut h = crc32fast::Hasher::new();
@@ -203,10 +195,8 @@ pub fn make_log_frame(item: &LogItem) -> Result<BytesMut, Error> {
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(LOG_FRAME_TYPE_ID);
warn!("make_log_frame payload len {}", enc.len());
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
@@ -230,7 +220,6 @@ pub fn make_stats_frame(item: &StatsItem) -> Result<BytesMut, Error> {
buf.put_u32_le(STATS_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
@@ -253,7 +242,6 @@ pub fn make_range_complete_frame() -> Result<BytesMut, Error> {
buf.put_u32_le(RANGE_COMPLETE_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
@@ -273,7 +261,6 @@ pub fn make_term_frame() -> Result<BytesMut, Error> {
buf.put_u32_le(TERM_FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
// TODO add padding to align to 8 bytes.
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
@@ -298,11 +285,15 @@ where
)));
}
if frame.tyid() == ERROR_FRAME_TYPE_ID {
let k: ::err::Error = match decode_from_slice(frame.buf()) {
let k: err::Error = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len());
let n = frame.buf().len().min(128);
error!(
"ERROR deserialize len {} ERROR_FRAME_TYPE_ID {}",
frame.buf().len(),
e
);
let n = frame.buf().len().min(256);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
Err(e)?
@@ -313,7 +304,7 @@ where
let k: LogItem = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len());
error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID {}", frame.buf().len(), e);
let n = frame.buf().len().min(128);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
@@ -325,7 +316,11 @@ where
let k: StatsItem = match decode_from_slice(frame.buf()) {
Ok(item) => item,
Err(e) => {
error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len());
error!(
"ERROR deserialize len {} STATS_FRAME_TYPE_ID {}",
frame.buf().len(),
e
);
let n = frame.buf().len().min(128);
let s = String::from_utf8_lossy(&frame.buf()[..n]);
error!("frame.buf as string: {:?}", s);
@@ -349,7 +344,7 @@ where
match decode_from_slice(frame.buf()) {
Ok(item) => Ok(item),
Err(e) => {
error!("decode_frame T = {}", std::any::type_name::<T>());
error!("decode_frame T = {}", any::type_name::<T>());
error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid());
let n = frame.buf().len().min(64);
let s = String::from_utf8_lossy(&frame.buf()[..n]);

View File

@@ -29,7 +29,6 @@ use items_0::Events;
use items_0::MergeError;
use items_0::RangeOverlapInfo;
use merger::Mergeable;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::*;
use serde::Deserialize;
@@ -38,14 +37,6 @@ use serde::Serializer;
use std::collections::VecDeque;
use std::fmt;
pub fn bool_is_false(x: &bool) -> bool {
*x == false
}
pub fn is_zero_u32(x: &u32) -> bool {
*x == 0
}
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
@@ -208,9 +199,9 @@ pub trait TimeBinnableTypeAggregator: Send {
fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output;
}
pub trait ChannelEventsInput: Stream<Item = Sitemty<ChannelEvents>> + items_0::Transformer + Send {}
pub trait ChannelEventsInput: Stream<Item = Sitemty<ChannelEvents>> + items_0::EventTransform + Send {}
impl<T> ChannelEventsInput for T where T: Stream<Item = Sitemty<ChannelEvents>> + items_0::Transformer + Send {}
impl<T> ChannelEventsInput for T where T: Stream<Item = Sitemty<ChannelEvents>> + items_0::EventTransform + Send {}
pub fn runfut<T, F>(fut: F) -> Result<T, err::Error>
where

View File

@@ -2,6 +2,7 @@ pub use crate::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::container::ByteEstimate;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
@@ -16,6 +17,8 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
const OUT_MAX_BYTES: u64 = 1024 * 200;
#[allow(unused)]
macro_rules! trace2 {
(__$($arg:tt)*) => ();
@@ -34,7 +37,7 @@ macro_rules! trace4 {
($($arg:tt)*) => (trace!($($arg)*));
}
pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + Unpin {
pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn new_empty(&self) -> Self;
@@ -316,7 +319,7 @@ where
if let Some(o) = self.out.as_ref() {
// A good threshold varies according to scalar type and shape.
// TODO replace this magic number by a bound on the bytes estimate.
if o.len() >= self.out_max_len || self.do_clear_out {
if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out {
trace3!("decide to output");
self.do_clear_out = false;
Break(Ready(Some(Ok(self.out.take().unwrap()))))
@@ -409,7 +412,7 @@ where
}
}
impl<T> items_0::Transformer for Merger<T> {
impl<T> items_0::EventTransform for Merger<T> {
fn query_transform_properties(&self) -> items_0::TransformProperties {
todo!()
}

View File

@@ -2,8 +2,8 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::EventTransform;
use items_0::TransformProperties;
use items_0::Transformer;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
@@ -17,7 +17,7 @@ pub struct Enumerate2<T> {
impl<T> Enumerate2<T> {
pub fn new(inp: T) -> Self
where
T: Transformer,
T: EventTransform,
{
Self { inp, cnt: 0 }
}
@@ -43,7 +43,7 @@ where
}
}
impl<T> Transformer for Enumerate2<T> {
impl<T> EventTransform for Enumerate2<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
@@ -114,7 +114,7 @@ where
}
}
impl<T, F, Fut> Transformer for Then2<T, F, Fut> {
impl<T, F, Fut> EventTransform for Then2<T, F, Fut> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
@@ -123,11 +123,11 @@ impl<T, F, Fut> Transformer for Then2<T, F, Fut> {
pub trait TransformerExt {
fn enumerate2(self) -> Enumerate2<Self>
where
Self: Transformer + Sized;
Self: EventTransform + Sized;
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: Transformer + Stream + Sized,
Self: EventTransform + Stream + Sized,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future;
}
@@ -135,14 +135,14 @@ pub trait TransformerExt {
impl<T> TransformerExt for T {
fn enumerate2(self) -> Enumerate2<Self>
where
Self: Transformer + Sized,
Self: EventTransform + Sized,
{
Enumerate2::new(self)
}
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: Transformer + Stream + Sized,
Self: EventTransform + Stream + Sized,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future,
{
@@ -178,7 +178,7 @@ where
}
}
impl<T> Transformer for VecStream<T> {
impl<T> EventTransform for VecStream<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}

View File

@@ -4,6 +4,10 @@ pub mod range;
pub mod status;
pub mod streamext;
pub mod log {
pub use tracing::{self, debug, error, event, info, span, trace, warn, Level};
}
use crate::log::*;
use bytes::Bytes;
use chrono::DateTime;
@@ -48,6 +52,16 @@ where
*x.borrow() == false
}
pub trait CmpZero {
fn is_zero(&self) -> bool;
}
impl CmpZero for u32 {
fn is_zero(&self) -> bool {
*self == 0
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel {
pub channel_config: ChannelConfig,
@@ -1055,9 +1069,28 @@ pub trait Dim0Index: Clone + fmt::Debug + PartialOrd {
fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum;
}
#[derive(Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
#[derive(Clone, Deserialize, PartialEq, PartialOrd)]
pub struct TsNano(pub u64);
mod ts_nano_ser {
use super::TsNano;
use crate::timeunits::SEC;
use chrono::TimeZone;
use chrono::Utc;
use serde::Serialize;
impl Serialize for TsNano {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32);
let value = format!("{}", ts.earliest().unwrap());
ser.serialize_newtype_struct("TsNano", &value)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub struct PulseId(u64);
@@ -1074,7 +1107,9 @@ impl TsNano {
impl fmt::Debug for TsNano {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32);
f.debug_struct("TsNano").field("ns", &ts).finish()
f.debug_struct("TsNano")
.field("ts", &ts.earliest().unwrap_or(Default::default()))
.finish()
}
}
@@ -1855,11 +1890,6 @@ where
}
}
pub mod log {
#[allow(unused_imports)]
pub use tracing::{self, debug, error, event, info, span, trace, warn, Level};
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EventDataReadStats {
pub parsed_bytes: u64,

View File

@@ -4,9 +4,6 @@ version = "0.0.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/nodenet.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -29,7 +26,6 @@ netpod = { path = "../netpod" }
query = { path = "../query" }
disk = { path = "../disk" }
#parse = { path = "../parse" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
dbconn = { path = "../dbconn" }

View File

@@ -376,14 +376,9 @@ async fn events_conn_handler_inner(
match events_conn_handler_inner_try(stream, addr, node_config).await {
Ok(_) => (),
Err(ce) => {
error!("events_conn_handler_inner sees error {:?}", ce.err);
// Try to pass the error over the network.
// If that fails, give error to the caller.
let mut out = ce.netout;
let e = ce.err;
let buf = make_error_frame(&e)?;
//type T = StreamItem<items::RangeCompletableItem<items::scalarevents::ScalarEvents<u32>>>;
//let buf = Err::<T, _>(e).make_frame()?;
let item: Sitemty<ChannelEvents> = Err(ce.err);
let buf = Framable::make_frame(&item)?;
out.write_all(&buf).await?;
}
}

View File

@@ -1,6 +1,6 @@
use crate::conn::events_conn_handler;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::streamitem::ERROR_FRAME_TYPE_ID;
@@ -9,11 +9,10 @@ use items_0::streamitem::LOG_FRAME_TYPE_ID;
use items_0::streamitem::STATS_FRAME_TYPE_ID;
use items_2::channelevents::ChannelEvents;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
use items_2::frame::make_frame;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::AggKind;
use netpod::Channel;
use netpod::Cluster;
use netpod::Database;
@@ -21,9 +20,9 @@ use netpod::FileIoBufferSize;
use netpod::Node;
use netpod::NodeConfig;
use netpod::NodeConfigCached;
use netpod::PerfOpts;
use netpod::SfDatabuffer;
use query::api4::events::PlainEventsQuery;
use std::time::Duration;
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
@@ -83,15 +82,15 @@ fn raw_data_00() {
};
let qu = PlainEventsQuery::new(channel, range);
let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap());
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query)));
let frame = make_frame(&item).unwrap();
let frame = sitem_data(query).make_frame()?;
let jh = taskrun::spawn(events_conn_handler(client, addr, cfg));
con.write_all(&frame).await.unwrap();
eprintln!("written");
con.shutdown().await.unwrap();
eprintln!("shut down");
let mut frames = InMemoryFrameAsyncReadStream::new(con, 1024 * 128);
let perf_opts = PerfOpts::default();
let mut frames = InMemoryFrameAsyncReadStream::new(con, perf_opts.inmem_bufcap);
while let Some(frame) = frames.next().await {
match frame {
Ok(frame) => match frame {

View File

@@ -80,7 +80,7 @@ impl CompressionMethod {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ConfigEntry {
pub ts: u64,
pub ts: TsNano,
pub pulse: i64,
pub ks: i32,
pub bs: TsNano,
@@ -130,8 +130,8 @@ impl ConfigEntry {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelConfigs {
pub format_version: i16,
pub channel_name: String,
pub entries: Vec<ConfigEntry>,
@@ -238,7 +238,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
Ok((
inp_e,
Some(ConfigEntry {
ts: ts as u64,
ts: TsNano::from_ns(ts as u64),
pulse,
ks,
bs,
@@ -267,7 +267,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
/**
Parse a complete configuration file from given in-memory input buffer.
*/
pub fn parse_config(inp: &[u8]) -> NRes<Config> {
pub fn parse_config(inp: &[u8]) -> NRes<ChannelConfigs> {
let (inp, ver) = be_i16(inp)?;
let (inp, len1) = be_i32(inp)?;
if len1 <= 8 || len1 > 500 {
@@ -294,7 +294,7 @@ pub fn parse_config(inp: &[u8]) -> NRes<Config> {
return mkerr(format!("channelName utf8 error {:?}", e));
}
};
let ret = Config {
let ret = ChannelConfigs {
format_version: ver,
channel_name: channel_name,
entries: entries,
@@ -320,7 +320,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result<Chann
}
// TODO can I take parameters as ref, even when used in custom streams?
pub async fn read_local_config(channel: Channel, node: Node) -> Result<Config, Error> {
pub async fn read_local_config(channel: Channel, node: Node) -> Result<ChannelConfigs, Error> {
let path = node
.sf_databuffer
.as_ref()
@@ -360,18 +360,18 @@ pub enum MatchingConfigEntry<'a> {
pub fn extract_matching_config_entry<'a>(
range: &NanoRange,
channel_config: &'a Config,
channel_config: &'a ChannelConfigs,
) -> Result<MatchingConfigEntry<'a>, Error> {
let mut ixs = vec![];
let mut ixs = Vec::new();
for i1 in 0..channel_config.entries.len() {
let e1 = &channel_config.entries[i1];
if i1 + 1 < channel_config.entries.len() {
let e2 = &channel_config.entries[i1 + 1];
if e1.ts < range.end && e2.ts >= range.beg {
if e1.ts.ns() < range.end && e2.ts.ns() >= range.beg {
ixs.push(i1);
}
} else {
if e1.ts < range.end {
if e1.ts.ns() < range.end {
ixs.push(i1);
}
}
@@ -397,7 +397,7 @@ mod test {
let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config";
//let path = "../resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config";
let mut f1 = std::fs::File::open(path).unwrap();
let mut buf = vec![];
let mut buf = Vec::new();
f1.read_to_end(&mut buf).unwrap();
buf
}
@@ -415,8 +415,8 @@ mod test {
assert_eq!(config.format_version, 0);
assert_eq!(config.entries.len(), 18);
for e in &config.entries {
assert!(e.ts >= 631152000000000000);
assert!(e.ts <= 1613640673424172164);
assert!(e.ts.ns() >= 631152000000000000);
assert!(e.ts.ns() <= 1613640673424172164);
assert!(e.shape.is_some());
}
}

View File

@@ -9,7 +9,7 @@ use std::collections::BTreeMap;
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum EventTransform {
enum EventTransformQuery {
EventBlobsVerbatim,
EventBlobsUncompressed,
ValueFull,
@@ -20,7 +20,7 @@ pub enum EventTransform {
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TimeBinningTransform {
enum TimeBinningTransformQuery {
None,
TimeWeighted,
Unweighted,
@@ -28,8 +28,8 @@ pub enum TimeBinningTransform {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TransformQuery {
event: EventTransform,
time_binning: TimeBinningTransform,
event: EventTransformQuery,
time_binning: TimeBinningTransformQuery,
}
impl TransformQuery {
@@ -39,15 +39,15 @@ impl TransformQuery {
pub fn default_events() -> Self {
Self {
event: EventTransform::ValueFull,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::ValueFull,
time_binning: TimeBinningTransformQuery::None,
}
}
pub fn default_time_binned() -> Self {
Self {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::TimeWeighted,
event: EventTransformQuery::MinMaxAvgDev,
time_binning: TimeBinningTransformQuery::TimeWeighted,
}
}
@@ -61,28 +61,30 @@ impl TransformQuery {
pub fn for_event_blobs() -> Self {
Self {
event: EventTransform::EventBlobsVerbatim,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::EventBlobsVerbatim,
time_binning: TimeBinningTransformQuery::None,
}
}
pub fn for_time_weighted_scalar() -> Self {
Self {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::TimeWeighted,
event: EventTransformQuery::MinMaxAvgDev,
time_binning: TimeBinningTransformQuery::TimeWeighted,
}
}
pub fn is_event_blobs(&self) -> bool {
match &self.event {
EventTransform::EventBlobsVerbatim => true,
EventTransform::EventBlobsUncompressed => {
EventTransformQuery::EventBlobsVerbatim => true,
EventTransformQuery::EventBlobsUncompressed => {
error!("TODO decide on uncompressed event blobs");
panic!()
}
_ => false,
}
}
pub fn build_event_transform(&self) -> () {}
}
impl FromUrl for TransformQuery {
@@ -97,35 +99,35 @@ impl FromUrl for TransformQuery {
if let Some(s) = pairs.get(key) {
let ret = if s == "eventBlobs" {
TransformQuery {
event: EventTransform::EventBlobsVerbatim,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::EventBlobsVerbatim,
time_binning: TimeBinningTransformQuery::None,
}
} else if s == "fullValue" {
TransformQuery {
event: EventTransform::ValueFull,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::ValueFull,
time_binning: TimeBinningTransformQuery::None,
}
} else if s == "timeWeightedScalar" {
TransformQuery {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::TimeWeighted,
event: EventTransformQuery::MinMaxAvgDev,
time_binning: TimeBinningTransformQuery::TimeWeighted,
}
} else if s == "unweightedScalar" {
TransformQuery {
event: EventTransform::EventBlobsVerbatim,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::EventBlobsVerbatim,
time_binning: TimeBinningTransformQuery::None,
}
} else if s == "binnedX" {
let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?;
warn!("TODO binnedXcount");
TransformQuery {
event: EventTransform::MinMaxAvgDev,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::MinMaxAvgDev,
time_binning: TimeBinningTransformQuery::None,
}
} else if s == "pulseIdDiff" {
TransformQuery {
event: EventTransform::PulseIdDiff,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::PulseIdDiff,
time_binning: TimeBinningTransformQuery::None,
}
} else {
return Err(Error::with_msg("can not extract binningScheme"));
@@ -141,8 +143,8 @@ impl FromUrl for TransformQuery {
})
.unwrap_or(None);
let ret = TransformQuery {
event: EventTransform::EventBlobsVerbatim,
time_binning: TimeBinningTransform::None,
event: EventTransformQuery::EventBlobsVerbatim,
time_binning: TimeBinningTransformQuery::None,
};
Ok(ret)
}

View File

@@ -6,7 +6,6 @@ edition = "2021"
[dependencies]
tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.26"
futures-util = "0.3.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -21,7 +20,6 @@ wasmer = { version = "3.1.1", default-features = false, features = ["sys", "cran
err = { path = "../err" }
netpod = { path = "../netpod" }
query = { path = "../query" }
items = { path = "../items" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }

View File

@@ -15,15 +15,20 @@ use std::task::Poll;
pub struct EventsFromFrames<O> {
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
dbgdesc: String,
errored: bool,
completed: bool,
_m1: PhantomData<O>,
}
impl<O> EventsFromFrames<O> {
pub fn new(inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>) -> Self {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
dbgdesc: String,
) -> Self {
Self {
inp,
dbgdesc,
errored: false,
completed: false,
_m1: PhantomData,
@@ -39,7 +44,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = netpod::log::span!(netpod::log::Level::INFO, "EvFrFr");
let span = span!(Level::INFO, "EvFrFr", id = tracing::field::Empty);
span.record("id", &self.dbgdesc);
let _spg = span.enter();
loop {
break if self.completed {
@@ -50,12 +56,22 @@ where
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Log(item) => {
info!("{} {:?} {}", item.node_ix, item.level, item.msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<O>>(&frame) {
Ok(item) => match item {
Ok(item) => Ready(Some(Ok(item))),
Ok(item) => match item {
StreamItem::Log(k) => {
info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg);
Ready(Some(Ok(StreamItem::Log(k))))
}
item => Ready(Some(Ok(item))),
},
Err(e) => {
error!("rcvd err: {}", e);
self.errored = true;
Ready(Some(Err(e)))
}

View File

@@ -12,12 +12,10 @@ use futures_util::Stream;
use futures_util::StreamExt;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::eventfull::EventFull;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::frame::make_frame;
use items_2::framable::Framable;
use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::Cluster;
@@ -36,25 +34,21 @@ pub async fn x_processed_event_blobs_stream_from_node(
perf_opts: PerfOpts,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
debug!(
"x_processed_event_blobs_stream_from_node to: {}:{}",
node.host, node.port_raw
);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("x_processed_event_blobs_stream_from_node to: {addr}",);
let net = TcpStream::connect(addr.clone()).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
EventQueryJsonStringFrame(qjs),
)));
let buf = make_frame(&item)?;
let item = sitem_data(EventQueryJsonStringFrame(qjs));
let buf = item.make_frame()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let frames = Box::pin(frames) as _;
let items = EventsFromFrames::new(frames);
let frames = Box::pin(frames);
let items = EventsFromFrames::new(frames, addr);
Ok(Box::pin(items))
}
@@ -69,22 +63,23 @@ where
// TODO when unit tests established, change to async connect:
let mut streams = Vec::new();
for node in &cluster.nodes {
debug!("open_tcp_streams to: {}:{}", node.host, node.port_raw);
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("open_tcp_streams to: {addr}");
let net = TcpStream::connect(addr.clone()).await?;
let qjs = serde_json::to_string(&query)?;
let (netin, mut netout) = net.into_split();
let item = EventQueryJsonStringFrame(qjs);
let item = sitem_data(item);
let buf = make_frame(&item)?;
let item = sitem_data(EventQueryJsonStringFrame(qjs));
let buf = item.make_frame()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2);
let frames = Box::pin(frames) as _;
let stream = EventsFromFrames::<T>::new(frames);
let perf_opts = PerfOpts::default();
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, addr);
let stream = stream.map(|x| {
info!("tcp stream recv sees item {x:?}");
x