WIP
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
use err::Error;
|
||||
use netpod::log::Level;
|
||||
use netpod::DiskStats;
|
||||
use netpod::EventDataReadStats;
|
||||
@@ -69,7 +68,9 @@ impl LogItem {
|
||||
}
|
||||
}
|
||||
|
||||
pub type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
|
||||
pub type SitemErrTy = err::Error;
|
||||
|
||||
pub type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, SitemErrTy>;
|
||||
|
||||
pub type Sitemty2<T, E> = Result<StreamItem<RangeCompletableItem<T>>, E>;
|
||||
|
||||
@@ -144,6 +145,20 @@ pub fn sitem_data<X>(x: X) -> Sitemty<X> {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||
}
|
||||
|
||||
pub fn sitem_err_from_string<T, D>(x: T) -> Sitemty<D>
|
||||
where
|
||||
T: ToString,
|
||||
{
|
||||
Err(err::Error::from_string(x))
|
||||
}
|
||||
|
||||
pub fn sitem_err2_from_string<T>(x: T) -> err::Error
|
||||
where
|
||||
T: ToString,
|
||||
{
|
||||
err::Error::from_string(x)
|
||||
}
|
||||
|
||||
mod levelserde {
|
||||
use super::Level;
|
||||
use serde::de::{self, Visitor};
|
||||
|
||||
@@ -1,18 +1,9 @@
|
||||
use crate::collect_s::CollectableDyn;
|
||||
use crate::collect_s::CollectorDyn;
|
||||
use crate::collect_s::ToJsonResult;
|
||||
use crate::AsAnyMut;
|
||||
use crate::AsAnyRef;
|
||||
use crate::Events;
|
||||
use crate::Resettable;
|
||||
use crate::TypeName;
|
||||
use crate::WithLen;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::TsNano;
|
||||
use std::any::Any;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ chrono = { version = "0.4.19", features = ["serde"] }
|
||||
crc32fast = "1.3.2"
|
||||
futures-util = "0.3.24"
|
||||
humantime-serde = "1.1.1"
|
||||
thiserror = "1"
|
||||
thiserror = "0.0.1"
|
||||
err = { path = "../err" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_proc = { path = "../items_proc" }
|
||||
|
||||
@@ -21,7 +21,7 @@ url = "2.5.0"
|
||||
num-traits = "0.2.16"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.5"
|
||||
thiserror = "1"
|
||||
thiserror = "0.0.1"
|
||||
err = { path = "../err" }
|
||||
|
||||
[patch.crates-io]
|
||||
|
||||
@@ -205,6 +205,10 @@ pub enum NetpodError {
|
||||
MissingBinningScheme,
|
||||
BadCacheUsage(String),
|
||||
TimelikeBinWidthImpossibleForPulseRange,
|
||||
BinCountTooLarge,
|
||||
BinCountTooSmall,
|
||||
BinnedNoGridMatch,
|
||||
NotTimerange,
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
@@ -2572,20 +2576,17 @@ pub enum BinnedRangeEnum {
|
||||
}
|
||||
|
||||
impl BinnedRangeEnum {
|
||||
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, Error>
|
||||
fn covering_range_ty<T>(a: T, b: T, min_bin_count: u32) -> Result<Self, NetpodError>
|
||||
where
|
||||
T: Dim0Index + 'static,
|
||||
{
|
||||
let opts = T::binned_bin_len_opts();
|
||||
if min_bin_count < 1 {
|
||||
Err(Error::with_msg("min_bin_count < 1"))?;
|
||||
Err(NetpodError::BinCountTooSmall)?;
|
||||
}
|
||||
let bin_count_max = i32::MAX as u32;
|
||||
if min_bin_count > bin_count_max {
|
||||
Err(Error::with_msg(format!(
|
||||
"min_bin_count > {}: {}",
|
||||
bin_count_max, min_bin_count
|
||||
)))?;
|
||||
Err(NetpodError::BinCountTooLarge)?;
|
||||
}
|
||||
let du = b.sub(&a);
|
||||
let max_bin_len = du.div_n(min_bin_count as u64);
|
||||
@@ -2599,7 +2600,7 @@ impl BinnedRangeEnum {
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
Err(Error::with_msg_no_trace("can not find matching binned grid"))
|
||||
Err(NetpodError::BinnedNoGridMatch)
|
||||
}
|
||||
|
||||
/// Cover at least the given range while selecting the bin width which best fits the requested bin width.
|
||||
@@ -2611,7 +2612,7 @@ impl BinnedRangeEnum {
|
||||
}
|
||||
|
||||
/// Cover at least the given range with at least as many as the requested number of bins.
|
||||
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, Error> {
|
||||
pub fn covering_range(range: SeriesRange, min_bin_count: u32) -> Result<Self, NetpodError> {
|
||||
match range {
|
||||
SeriesRange::TimeRange(k) => Self::covering_range_ty(TsNano(k.beg), TsNano(k.end), min_bin_count),
|
||||
SeriesRange::PulseRange(k) => Self::covering_range_ty(PulseId(k.beg), PulseId(k.end), min_bin_count),
|
||||
|
||||
@@ -102,12 +102,12 @@ impl From<(u64, u64)> for NanoRange {
|
||||
}
|
||||
|
||||
impl TryFrom<&SeriesRange> for NanoRange {
|
||||
type Error = Error;
|
||||
type Error = NetpodError;
|
||||
|
||||
fn try_from(val: &SeriesRange) -> Result<NanoRange, Self::Error> {
|
||||
match val {
|
||||
SeriesRange::TimeRange(x) => Ok(x.clone()),
|
||||
SeriesRange::PulseRange(_) => Err(Error::with_public_msg_no_trace("given SeriesRange is not a time range")),
|
||||
SeriesRange::PulseRange(_) => Err(NetpodError::NotTimerange),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ chrono = { version = "0.4.19", features = ["serde"] }
|
||||
url = "2.2"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
thiserror = "1"
|
||||
thiserror = "0.0.1"
|
||||
netpod = { path = "../netpod" }
|
||||
items_0 = { path = "../items_0" }
|
||||
items_2 = { path = "../items_2" }
|
||||
|
||||
@@ -22,9 +22,11 @@ use url::Url;
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "BinnedQuery")]
|
||||
pub enum Error {
|
||||
BadInt(#[from] std::num::ParseIntError),
|
||||
MultipleBinCountBinWidth,
|
||||
BadUseRt,
|
||||
Netpod(#[from] netpod::NetpodError),
|
||||
Transform(#[from] crate::transform::Error),
|
||||
}
|
||||
|
||||
mod serde_option_vec_duration {
|
||||
|
||||
@@ -18,11 +18,11 @@ bytes = "1.6"
|
||||
arrayref = "0.3.6"
|
||||
crc32fast = "1.3.2"
|
||||
byteorder = "1.4.3"
|
||||
async-channel = "1.8.0"
|
||||
async-channel = "1.9.0"
|
||||
rand_xoshiro = "0.6.0"
|
||||
thiserror = "0.0.1"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true }
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
query = { path = "../query" }
|
||||
items_0 = { path = "../items_0" }
|
||||
@@ -35,3 +35,6 @@ taskrun = { path = "../taskrun" }
|
||||
|
||||
[features]
|
||||
wasm_transform = ["wasmer"]
|
||||
|
||||
[patch.crates-io]
|
||||
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
|
||||
|
||||
@@ -2,9 +2,10 @@ use bytes::Buf;
|
||||
use bytes::BufMut;
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_err2_from_string;
|
||||
use items_0::streamitem::sitem_err_from_string;
|
||||
use items_0::streamitem::LogItem;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
@@ -27,16 +28,24 @@ use std::time::Duration;
|
||||
const FRAME_HEAD_LEN: usize = 16;
|
||||
const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80;
|
||||
|
||||
trait ErrConv<T> {
|
||||
fn ec(self) -> Result<T, Error>;
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "CborStream")]
|
||||
pub enum Error {
|
||||
FromSlice(#[from] std::array::TryFromSliceError),
|
||||
Msg(String),
|
||||
Ciborium(#[from] ciborium::de::Error<std::io::Error>),
|
||||
}
|
||||
|
||||
impl<T, K> ErrConv<T> for Result<T, ciborium::de::Error<K>>
|
||||
struct ErrMsg<E>(E)
|
||||
where
|
||||
K: fmt::Debug,
|
||||
E: ToString;
|
||||
|
||||
impl<E> From<ErrMsg<E>> for Error
|
||||
where
|
||||
E: ToString,
|
||||
{
|
||||
fn ec(self) -> Result<T, Error> {
|
||||
self.map_err(|e| Error::from_string(format!("{e}")))
|
||||
fn from(value: ErrMsg<E>) -> Self {
|
||||
Self::Msg(value.0.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,8 +76,7 @@ impl From<CborBytes> for Bytes {
|
||||
pub type CborStream = Pin<Box<dyn Stream<Item = Result<CborBytes, Error>> + Send>>;
|
||||
|
||||
// TODO move this type decl because it is not specific to cbor
|
||||
pub type SitemtyDynEventsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send>>;
|
||||
pub type SitemtyDynEventsStream = Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>;
|
||||
|
||||
pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<CborBytes, Error>> {
|
||||
let interval = tokio::time::interval(Duration::from_millis(4000));
|
||||
@@ -83,7 +91,7 @@ pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stre
|
||||
prepend.chain(stream)
|
||||
}
|
||||
|
||||
fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>) -> Result<CborBytes, Error> {
|
||||
fn map_events(x: Sitemty<Box<dyn Events>>) -> Result<CborBytes, Error> {
|
||||
match x {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
@@ -93,8 +101,7 @@ fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error
|
||||
// TODO impl generically on EventsDim0 ?
|
||||
if let Some(evs) = evs.as_any_ref().downcast_ref::<items_2::eventsdim0::EventsDim0<f64>>() {
|
||||
let mut buf = Vec::new();
|
||||
ciborium::into_writer(evs, &mut buf)
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
ciborium::into_writer(evs, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let _item = CborBytes(bytes);
|
||||
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
@@ -142,9 +149,9 @@ fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error
|
||||
let item = cbor!({
|
||||
"rangeFinal" => true,
|
||||
})
|
||||
.map_err(Error::from_string)?;
|
||||
.map_err(|e| Error::Msg(e.to_string()))?;
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||
ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
Ok(item)
|
||||
@@ -166,9 +173,9 @@ fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error
|
||||
let item = cbor!({
|
||||
"error" => e.to_string(),
|
||||
})
|
||||
.map_err(Error::from_string)?;
|
||||
.map_err(|e| Error::Msg(e.to_string()))?;
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||
ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
Ok(item)
|
||||
@@ -181,9 +188,9 @@ fn make_keepalive() -> Result<CborBytes, Error> {
|
||||
let item = cbor!({
|
||||
"type" => "keepalive",
|
||||
})
|
||||
.map_err(Error::from_string)?;
|
||||
.map_err(ErrMsg)?;
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||
ciborium::into_writer(&item, &mut buf).map_err(ErrMsg)?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = Ok(CborBytes(bytes));
|
||||
item
|
||||
@@ -213,7 +220,7 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
|
||||
}
|
||||
let n = u32::from_le_bytes(self.buf[..4].try_into()?);
|
||||
if n > FRAME_PAYLOAD_MAX {
|
||||
let e = Error::with_msg_no_trace(format!("frame too large {n}"));
|
||||
let e = ErrMsg(format!("frame too large {n}")).into();
|
||||
error!("{e}");
|
||||
return Err(e);
|
||||
}
|
||||
@@ -227,7 +234,7 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
|
||||
return Ok(None);
|
||||
}
|
||||
let buf = &self.buf[FRAME_HEAD_LEN..frame_len];
|
||||
let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?;
|
||||
let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?;
|
||||
// debug!("decoded ciborium value {val:?}");
|
||||
let item = if let Some(map) = val.as_map() {
|
||||
let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect();
|
||||
@@ -285,14 +292,14 @@ where
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match self.try_parse() {
|
||||
Ok(Some(x)) => Ready(Some(x)),
|
||||
Ok(Some(x)) => Ready(Some(x.map_err(|e| sitem_err2_from_string(e)))),
|
||||
Ok(None) => match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => match x {
|
||||
Ok(x) => {
|
||||
self.buf.put_slice(&x);
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
},
|
||||
Ready(None) => {
|
||||
if self.buf.len() > 0 {
|
||||
@@ -302,7 +309,7 @@ where
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -312,7 +319,7 @@ macro_rules! cbor_scalar {
|
||||
($ty:ident, $buf:expr) => {{
|
||||
type T = $ty;
|
||||
type C = EventsDim0<T>;
|
||||
let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?;
|
||||
let item: C = ciborium::from_reader(Cursor::new($buf))?;
|
||||
Box::new(item)
|
||||
}};
|
||||
}
|
||||
@@ -321,7 +328,7 @@ macro_rules! cbor_wave {
|
||||
($ty:ident, $buf:expr) => {{
|
||||
type T = $ty;
|
||||
type C = EventsDim1<T>;
|
||||
let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?;
|
||||
let item: C = ciborium::from_reader(Cursor::new($buf))?;
|
||||
Box::new(item)
|
||||
}};
|
||||
}
|
||||
@@ -339,23 +346,13 @@ fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape
|
||||
ScalarType::I64 => cbor_scalar!(i64, buf),
|
||||
ScalarType::F32 => cbor_scalar!(f32, buf),
|
||||
ScalarType::F64 => cbor_scalar!(f64, buf),
|
||||
_ => {
|
||||
return Err(Error::from_string(format!(
|
||||
"decode_cbor_to_box_events {:?} {:?}",
|
||||
scalar_type, shape
|
||||
)))
|
||||
}
|
||||
_ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()),
|
||||
},
|
||||
Shape::Wave(_) => match scalar_type {
|
||||
ScalarType::U8 => cbor_wave!(u8, buf),
|
||||
ScalarType::U16 => cbor_wave!(u16, buf),
|
||||
ScalarType::I64 => cbor_wave!(i64, buf),
|
||||
_ => {
|
||||
return Err(Error::from_string(format!(
|
||||
"decode_cbor_to_box_events {:?} {:?}",
|
||||
scalar_type, shape
|
||||
)))
|
||||
}
|
||||
_ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()),
|
||||
},
|
||||
Shape::Image(_, _) => todo!(),
|
||||
};
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use err::Error;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
@@ -23,22 +22,24 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::Instrument;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "CollectDyn")]
|
||||
pub enum Error {
|
||||
Msg(String),
|
||||
NoResultNoCollector,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace3 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
}
|
||||
struct ErrMsg<E>(E)
|
||||
where
|
||||
E: ToString;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
(D$($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
impl<E> From<ErrMsg<E>> for Error
|
||||
where
|
||||
E: ToString,
|
||||
{
|
||||
fn from(value: ErrMsg<E>) -> Self {
|
||||
Self::Msg(value.0.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub enum CollectResult<T> {
|
||||
@@ -154,7 +155,7 @@ impl Collect {
|
||||
},
|
||||
Err(e) => {
|
||||
// TODO Need to use some flags to get good enough error message for remote user.
|
||||
Err(e)
|
||||
Err(ErrMsg(e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -184,7 +185,7 @@ impl Future for Collect {
|
||||
//info!("collect stats total duration: {:?}", total_duration);
|
||||
Ready(Ok(CollectResult::Some(res)))
|
||||
}
|
||||
Err(e) => Ready(Err(e)),
|
||||
Err(e) => Ready(Err(ErrMsg(e).into())),
|
||||
},
|
||||
None => {
|
||||
debug!("no result because no collector was created");
|
||||
@@ -310,15 +311,16 @@ where
|
||||
},
|
||||
Err(e) => {
|
||||
// TODO Need to use some flags to get good enough error message for remote user.
|
||||
return Err(e);
|
||||
return Err(ErrMsg(e).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = range_complete;
|
||||
let _ = timed_out;
|
||||
let res = collector
|
||||
.ok_or_else(|| Error::with_msg_no_trace(format!("no result, no collector created")))?
|
||||
.result(range, binrange)?;
|
||||
.ok_or_else(|| Error::NoResultNoCollector)?
|
||||
.result(range, binrange)
|
||||
.map_err(ErrMsg)?;
|
||||
info!("collect_in_span stats total duration: {:?}", total_duration);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_err_from_string;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::ReqCtx;
|
||||
@@ -15,10 +16,16 @@ use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "EventsPlainReader")]
|
||||
pub enum Error {
|
||||
Timebinned(#[from] crate::timebinnedjson::Error),
|
||||
}
|
||||
|
||||
type ChEvsBox = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
|
||||
|
||||
enum StreamState {
|
||||
Opening(Pin<Box<dyn Future<Output = Result<ChEvsBox, ::err::Error>> + Send>>),
|
||||
Opening(Pin<Box<dyn Future<Output = Result<ChEvsBox, Error>> + Send>>),
|
||||
Reading(ChEvsBox),
|
||||
}
|
||||
|
||||
@@ -38,7 +45,7 @@ impl Stream for InnerStream {
|
||||
self.state = StreamState::Reading(x);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => Ready(Some(Err(e))),
|
||||
Ready(Err(e)) => Ready(Some(sitem_err_from_string(e))),
|
||||
Pending => Pending,
|
||||
},
|
||||
StreamState::Reading(fut) => match fut.poll_next_unpin(cx) {
|
||||
@@ -82,7 +89,7 @@ impl EventsReadProvider for SfDatabufferEventReadProvider {
|
||||
open_bytes,
|
||||
)
|
||||
.await;
|
||||
ret.map(|x| Box::pin(x) as _)
|
||||
ret.map_err(|e| e.into()).map(|x| Box::pin(x) as _)
|
||||
}));
|
||||
let stream = InnerStream { state };
|
||||
EventsReading::new(Box::pin(stream))
|
||||
|
||||
@@ -2,8 +2,6 @@ use bytes::Buf;
|
||||
use bytes::BufMut;
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
@@ -24,7 +22,7 @@ macro_rules! trace_parse {
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "StreamFramedBytes")]
|
||||
pub enum Error {
|
||||
FrameTooLarge,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::framable::FrameTypeInnerStatic;
|
||||
use items_0::streamitem::sitem_err_from_string;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
@@ -14,6 +14,10 @@ use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "FromFrames")]
|
||||
pub enum Error {}
|
||||
|
||||
pub struct EventsFromFrames<O> {
|
||||
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
|
||||
dbgdesc: String,
|
||||
@@ -100,7 +104,7 @@ where
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
Ready(Some(sitem_err_from_string(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::slidebuf::SlideBuf;
|
||||
use bytes::Bytes;
|
||||
use err::Error;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::SitemErrTy;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::streamitem::TERM_FRAME_TYPE_ID;
|
||||
use items_2::framable::INMEM_FRAME_FOOT;
|
||||
@@ -16,17 +16,25 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); }
|
||||
|
||||
impl err::ToErr for crate::slidebuf::Error {
|
||||
fn to_err(self) -> Error {
|
||||
Error::with_msg_no_trace(format!("{self}"))
|
||||
}
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "InMem")]
|
||||
pub enum Error {
|
||||
Input,
|
||||
Slidebuf(#[from] crate::slidebuf::Error),
|
||||
IO(#[from] std::io::Error),
|
||||
LessThanNeedMin,
|
||||
LessThanHeader,
|
||||
HugeFrame(u32),
|
||||
BadMagic(u32),
|
||||
TryFromSlice(#[from] std::array::TryFromSliceError),
|
||||
BadCrc,
|
||||
EnoughInputNothingParsed,
|
||||
}
|
||||
|
||||
pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, SitemErrTy>> + Send>>;
|
||||
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); }
|
||||
|
||||
pub struct TcpReadAsBytes<INP> {
|
||||
inp: INP,
|
||||
}
|
||||
@@ -41,7 +49,7 @@ impl<INP> Stream for TcpReadAsBytes<INP>
|
||||
where
|
||||
INP: AsyncRead + Unpin,
|
||||
{
|
||||
type Item = Result<Bytes, err::Error>;
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -118,7 +126,7 @@ where
|
||||
}
|
||||
Err(e) => Ready(Err(e.into())),
|
||||
},
|
||||
Ready(Some(Err(_e))) => Ready(Err(Error::with_msg_no_trace("input error"))),
|
||||
Ready(Some(Err(_e))) => Ready(Err(Error::Input)),
|
||||
Ready(None) => Ready(Ok(0)),
|
||||
Pending => Pending,
|
||||
}
|
||||
@@ -140,10 +148,10 @@ where
|
||||
fn parse(&mut self) -> Result<Option<InMemoryFrame>, Error> {
|
||||
let buf = self.buf.data();
|
||||
if buf.len() < self.need_min {
|
||||
return Err(Error::with_msg_no_trace("expect at least need_min"));
|
||||
return Err(Error::LessThanNeedMin);
|
||||
}
|
||||
if buf.len() < INMEM_FRAME_HEAD {
|
||||
return Err(Error::with_msg_no_trace("expect at least enough bytes for the header"));
|
||||
return Err(Error::LessThanHeader);
|
||||
}
|
||||
let magic = u32::from_le_bytes(buf[0..4].try_into()?);
|
||||
let encid = u32::from_le_bytes(buf[4..8].try_into()?);
|
||||
@@ -158,7 +166,7 @@ where
|
||||
magic, u
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(Error::with_msg(msg));
|
||||
return Err(Error::BadMagic(magic));
|
||||
}
|
||||
if len > 1024 * 1024 * 50 {
|
||||
let msg = format!(
|
||||
@@ -166,7 +174,7 @@ where
|
||||
len, self.inp_bytes_consumed
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(Error::with_msg(msg));
|
||||
return Err(Error::HugeFrame(len));
|
||||
}
|
||||
let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize;
|
||||
if buf.len() < lentot {
|
||||
@@ -191,7 +199,7 @@ where
|
||||
payload_crc_match, frame_crc_match,
|
||||
);
|
||||
error!("{msg}");
|
||||
let e = Error::with_msg_no_trace(msg);
|
||||
let e = Error::BadCrc;
|
||||
return Err(e);
|
||||
}
|
||||
self.inp_bytes_consumed += lentot as u64;
|
||||
@@ -230,7 +238,7 @@ where
|
||||
Ok(None) => {
|
||||
if self.buf.len() >= self.need_min {
|
||||
self.done = true;
|
||||
let e = Error::with_msg_no_trace("enough bytes but nothing parsed");
|
||||
let e = Error::EnoughInputNothingParsed;
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
continue;
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
use crate::frames::inmem::BoxedBytesStream;
|
||||
use crate::transform::build_event_transform;
|
||||
use err::Error;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use items_0::container::ByteEstimate;
|
||||
use items_0::on_sitemty_data;
|
||||
use items_0::streamitem::sitem_data;
|
||||
use items_0::streamitem::sitem_err2_from_string;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::SitemErrTy;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::Appendable;
|
||||
@@ -30,13 +32,20 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Generator")]
|
||||
pub enum Error {
|
||||
UnsupportedIsEventBlobs,
|
||||
Transform(#[from] crate::transform::Error),
|
||||
}
|
||||
|
||||
pub fn make_test_channel_events_bytes_stream(
|
||||
subq: EventsSubQuery,
|
||||
node_count: u64,
|
||||
node_ix: u64,
|
||||
) -> Result<BoxedBytesStream, Error> {
|
||||
if subq.is_event_blobs() {
|
||||
let e = Error::with_msg_no_trace("evq.is_event_blobs() not supported in this generator");
|
||||
let e = Error::UnsupportedIsEventBlobs;
|
||||
error!("{e}");
|
||||
Err(e)
|
||||
} else {
|
||||
@@ -57,7 +66,9 @@ pub fn make_test_channel_events_bytes_stream(
|
||||
}
|
||||
})
|
||||
});
|
||||
let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze()));
|
||||
let stream = stream
|
||||
.map_err(sitem_err2_from_string)
|
||||
.map(|x| x.make_frame_dyn().map(|x| x.freeze()));
|
||||
let ret = Box::pin(stream);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use async_channel::Send;
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::Future;
|
||||
use futures_util::Stream;
|
||||
@@ -10,6 +9,10 @@ use std::ptr::NonNull;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "ItemClone")]
|
||||
pub enum Error {}
|
||||
|
||||
pub struct Itemclone<'a, T, INP>
|
||||
where
|
||||
T: 'static,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::cbor_stream::SitemtyDynEventsStream;
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
@@ -10,6 +9,10 @@ use netpod::log::*;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "JsonStream")]
|
||||
pub enum Error {}
|
||||
|
||||
pub struct JsonBytes(String);
|
||||
|
||||
impl JsonBytes {
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
use crate::filechunkread::FileChunkRead;
|
||||
use err::Error;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::histo::HistoLog2;
|
||||
use netpod::log::*;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "NeedMinBuffer")]
|
||||
pub enum Error {}
|
||||
|
||||
pub struct NeedMinBuffer {
|
||||
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
|
||||
|
||||
@@ -4,14 +4,11 @@ use crate::firsterr::non_empty;
|
||||
use crate::firsterr::only_first_err;
|
||||
use crate::plaineventsstream::dyn_events_stream;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::ReqCtx;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "PlainEventsCbor")]
|
||||
pub enum Error {
|
||||
Stream(#[from] crate::plaineventsstream::Error),
|
||||
|
||||
@@ -6,8 +6,6 @@ use crate::json_stream::events_stream_to_json_stream;
|
||||
use crate::json_stream::JsonStream;
|
||||
use crate::plaineventsstream::dyn_events_stream;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
use items_0::on_sitemty_data;
|
||||
@@ -21,13 +19,11 @@ use serde_json::Value as JsonValue;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "PlainEventsJson")]
|
||||
pub enum Error {
|
||||
Stream(#[from] crate::plaineventsstream::Error),
|
||||
Collect(err::Error),
|
||||
Json(#[from] serde_json::Error),
|
||||
Err(err::Error),
|
||||
}
|
||||
|
||||
pub async fn plain_events_json(
|
||||
@@ -92,11 +88,10 @@ pub async fn plain_events_json(
|
||||
Some(evq.range().clone()),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.map_err(Error::Collect)?;
|
||||
.await?;
|
||||
debug!("plain_events_json collected");
|
||||
if let CollectResult::Some(x) = collected {
|
||||
let jsval = x.to_json_value().map_err(|e| Error::Err(e))?;
|
||||
let jsval = x.to_json_value()?;
|
||||
debug!("plain_events_json json serialized");
|
||||
Ok(CollectResult::Some(jsval))
|
||||
} else {
|
||||
|
||||
@@ -2,14 +2,10 @@ use crate::tcprawclient::container_stream_from_bytes_stream;
|
||||
use crate::tcprawclient::make_sub_query;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::transform::build_merged_event_transform;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::on_sitemty_data;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::Events;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Merger;
|
||||
@@ -19,10 +15,11 @@ use netpod::ReqCtx;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use std::pin::Pin;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "PlainEventsStream")]
|
||||
pub enum Error {
|
||||
OtherErr(#[from] err::Error),
|
||||
Netpod(#[from] netpod::NetpodError),
|
||||
Transform(#[from] crate::transform::Error),
|
||||
}
|
||||
|
||||
pub type DynEventsStream = Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>;
|
||||
@@ -88,13 +85,13 @@ pub async fn dyn_events_stream(
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "wasm_transform"))]
|
||||
async fn transform_wasm<INP>(
|
||||
async fn transform_wasm<INP, ETS>(
|
||||
stream: INP,
|
||||
_wasmname: &str,
|
||||
_ctx: &ReqCtx,
|
||||
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, err::Error>> + Send, err::Error>
|
||||
) -> Result<impl Stream<Item = Sitemty<Box<dyn Events>>> + Send, Error>
|
||||
where
|
||||
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, err::Error>> + Send + 'static,
|
||||
INP: Stream<Item = Sitemty<Box<dyn Events>>> + Send + 'static,
|
||||
{
|
||||
let ret: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>> = Box::pin(stream);
|
||||
Ok(ret)
|
||||
@@ -105,9 +102,9 @@ async fn transform_wasm<INP>(
|
||||
stream: INP,
|
||||
wasmname: &str,
|
||||
ctx: &ReqCtx,
|
||||
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send, Error>
|
||||
) -> Result<impl Stream<Item = Sitemty<Box<dyn Events>>> + Send, Error>
|
||||
where
|
||||
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send + 'static,
|
||||
INP: Stream<Item = Sitemty<Box<dyn Events>>> + Send + 'static,
|
||||
{
|
||||
debug!("make wasm transform");
|
||||
use httpclient::url::Url;
|
||||
|
||||
@@ -3,7 +3,6 @@ use crate::collect::CollectResult;
|
||||
use crate::test::runfut;
|
||||
use crate::transform::build_event_transform;
|
||||
use crate::transform::EventsToTimeBinnable;
|
||||
use err::Error;
|
||||
use futures_util::stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::on_sitemty_data;
|
||||
@@ -21,31 +20,31 @@ use query::transform::TransformQuery;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn collect_channel_events_00() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
|
||||
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
|
||||
let stream = stream::iter(vec![
|
||||
sitem_data(evs0),
|
||||
sitem_data(evs1),
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
|
||||
]);
|
||||
let deadline = Instant::now() + Duration::from_millis(4000);
|
||||
let events_max = 10000;
|
||||
let res = crate::collect::collect(stream, deadline, events_max, None, None).await?;
|
||||
//eprintln!("collected result: {res:?}");
|
||||
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
|
||||
eprintln!("Great, a match");
|
||||
eprintln!("{res:?}");
|
||||
assert_eq!(res.len(), 40);
|
||||
} else {
|
||||
return Err(Error::with_msg(format!("bad type of collected result")));
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
runfut(fut)
|
||||
}
|
||||
// #[test]
|
||||
// fn collect_channel_events_00() -> Result<(), Error> {
|
||||
// let fut = async {
|
||||
// let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
|
||||
// let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
|
||||
// let stream = stream::iter(vec![
|
||||
// sitem_data(evs0),
|
||||
// sitem_data(evs1),
|
||||
// Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
|
||||
// ]);
|
||||
// let deadline = Instant::now() + Duration::from_millis(4000);
|
||||
// let events_max = 10000;
|
||||
// let res = crate::collect::collect(stream, deadline, events_max, None, None).await?;
|
||||
// //eprintln!("collected result: {res:?}");
|
||||
// if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
|
||||
// eprintln!("Great, a match");
|
||||
// eprintln!("{res:?}");
|
||||
// assert_eq!(res.len(), 40);
|
||||
// } else {
|
||||
// return Err(Error::with_msg(format!("bad type of collected result")));
|
||||
// }
|
||||
// Ok(())
|
||||
// };
|
||||
// runfut(fut)
|
||||
// }
|
||||
|
||||
// #[test]
|
||||
// fn collect_channel_events_01() -> Result<(), Error> {
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::lenframed;
|
||||
use crate::plaineventscbor::plain_events_cbor_stream;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreams;
|
||||
use crate::tcprawclient::TEST_BACKEND;
|
||||
use err::Error;
|
||||
use futures_util::future;
|
||||
use futures_util::Future;
|
||||
use futures_util::StreamExt;
|
||||
@@ -23,6 +22,13 @@ use query::api4::events::PlainEventsQuery;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "TestEvents")]
|
||||
pub enum Error {
|
||||
InMem(#[from] crate::frames::inmem::Error),
|
||||
Generator(#[from] crate::generators::Error),
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merged_events_cbor() {
|
||||
crate::test::runfut(merged_events_inner()).unwrap();
|
||||
|
||||
@@ -3,7 +3,7 @@ pub mod fromevents;
|
||||
pub mod timebin;
|
||||
|
||||
mod basic;
|
||||
mod fromlayers;
|
||||
pub(super) mod fromlayers;
|
||||
mod gapfill;
|
||||
mod grid;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_data;
|
||||
use items_0::streamitem::sitem_err_from_string;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
@@ -16,17 +16,21 @@ use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_first { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "TimeBinnedStream")]
|
||||
pub enum Error {
|
||||
MissingBinnerAfterProcessItem,
|
||||
CreateEmpty,
|
||||
NoBinnerAfterInputDone,
|
||||
Stream,
|
||||
Msg(String),
|
||||
}
|
||||
|
||||
type SitemtyStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
|
||||
|
||||
@@ -106,7 +110,7 @@ where
|
||||
self.binner.is_some()
|
||||
);
|
||||
if self.binner.is_none() {
|
||||
let e = Error::with_msg_no_trace("must emit on first input but no binner");
|
||||
let e = Error::MissingBinnerAfterProcessItem;
|
||||
self.done = true;
|
||||
return Err(e);
|
||||
}
|
||||
@@ -125,7 +129,7 @@ where
|
||||
if let Some(bins) = binner.empty() {
|
||||
Ok(Break(Ready(sitem_data(bins))))
|
||||
} else {
|
||||
let e = Error::with_msg_no_trace("must emit but can not even create empty A");
|
||||
let e = Error::CreateEmpty;
|
||||
error!("{e}");
|
||||
Err(e)
|
||||
}
|
||||
@@ -142,7 +146,7 @@ where
|
||||
|
||||
fn handle_item(
|
||||
&mut self,
|
||||
item: Result<StreamItem<RangeCompletableItem<T>>, Error>,
|
||||
item: Sitemty<T>,
|
||||
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
@@ -163,7 +167,7 @@ where
|
||||
Err(e) => {
|
||||
error!("received error item: {e}");
|
||||
self.done = true;
|
||||
Err(e)
|
||||
Err(Error::Msg(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -191,7 +195,7 @@ where
|
||||
self.done_data = true;
|
||||
Ok(Break(Ready(sitem_data(bins))))
|
||||
} else {
|
||||
let e = Error::with_msg_no_trace("must emit but can not even create empty B");
|
||||
let e = Error::CreateEmpty;
|
||||
error!("{e}");
|
||||
self.done_data = true;
|
||||
Err(e)
|
||||
@@ -200,7 +204,7 @@ where
|
||||
} else {
|
||||
warn!("input stream finished, still no binner");
|
||||
self.done_data = true;
|
||||
let e = Error::with_msg_no_trace(format!("input stream finished, still no binner"));
|
||||
let e = Error::NoBinnerAfterInputDone;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
@@ -261,7 +265,7 @@ where
|
||||
},
|
||||
Err(e) => {
|
||||
self.done = true;
|
||||
break Ready(Some(Err(e)));
|
||||
break Ready(Some(sitem_err_from_string(e)));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use crate as streams;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -19,6 +17,15 @@ use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "BinCachedReader")]
|
||||
pub enum Error {
|
||||
TodoImpl,
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Scylla(String),
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
|
||||
|
||||
@@ -110,15 +117,6 @@ pub trait CacheReadProvider: Send + Sync {
|
||||
fn write(&self, series: u64, bins: BinsBoxed) -> CacheWriting;
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "BinCachedReader")]
|
||||
pub enum Error {
|
||||
TodoImpl,
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Scylla(String),
|
||||
}
|
||||
|
||||
pub struct CachedReader {
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use crate::events::convertforbinning::ConvertForBinning;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
@@ -20,7 +18,7 @@ use std::task::Poll;
|
||||
|
||||
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "ReadingBinnedFromEvents")]
|
||||
pub enum Error {
|
||||
ExpectTimerange,
|
||||
|
||||
@@ -2,8 +2,6 @@ use super::cached::reader::CacheReadProvider;
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::fromevents::BinnedFromEvents;
|
||||
use crate::timebin::grid::find_next_finer_bin_len;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::Sitemty;
|
||||
@@ -26,7 +24,7 @@ use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "TimeBinnedFromLayers")]
|
||||
pub enum Error {
|
||||
GapFill(#[from] super::gapfill::Error),
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use super::cached::reader::CacheReadProvider;
|
||||
use super::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::fromevents::BinnedFromEvents;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_err_from_string;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
@@ -41,7 +40,7 @@ macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "BinCachedGapFill")]
|
||||
pub enum Error {
|
||||
CacheReader(#[from] super::cached::reader::Error),
|
||||
@@ -109,7 +108,7 @@ impl GapFill {
|
||||
let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())?
|
||||
.map(|x| match x {
|
||||
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
|
||||
Err(e) => Err(::err::Error::from_string(e)),
|
||||
Err(e) => sitem_err_from_string(e),
|
||||
});
|
||||
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>
|
||||
} else {
|
||||
@@ -294,7 +293,7 @@ impl GapFill {
|
||||
|
||||
fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> {
|
||||
// TODO emit bins that are ready for cache write into some separate channel
|
||||
let series = ::err::todoval();
|
||||
let series = todo!();
|
||||
self.cache_writing = Some(self.cache_read_provider.write(series, bins));
|
||||
Ok(())
|
||||
}
|
||||
@@ -338,7 +337,7 @@ impl Stream for GapFill {
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.cache_writing = None;
|
||||
Ready(Some(Err(::err::Error::from_string(e))))
|
||||
Ready(Some(sitem_err_from_string(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
@@ -348,7 +347,7 @@ impl Stream for GapFill {
|
||||
StreamItem::DataItem(RangeCompletableItem::Data(x)) => {
|
||||
match self.as_mut().handle_bins_finer(x) {
|
||||
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
}
|
||||
}
|
||||
StreamItem::DataItem(RangeCompletableItem::RangeComplete) => {
|
||||
@@ -358,7 +357,7 @@ impl Stream for GapFill {
|
||||
if self.cache_usage.is_cache_write() {
|
||||
match self.as_mut().cache_write_on_end() {
|
||||
Ok(()) => continue,
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
@@ -367,7 +366,7 @@ impl Stream for GapFill {
|
||||
StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))),
|
||||
StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))),
|
||||
},
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Ready(Some(Err(e))) => Ready(Some(sitem_err_from_string(e))),
|
||||
Ready(None) => {
|
||||
trace_handle!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?}",
|
||||
@@ -386,9 +385,7 @@ impl Stream for GapFill {
|
||||
exp_finer_range
|
||||
);
|
||||
if self.inp_finer_fills_gap {
|
||||
Ready(Some(Err(::err::Error::from_string(
|
||||
"finer input didn't deliver to the end",
|
||||
))))
|
||||
Ready(Some(sitem_err_from_string("finer input didn't deliver to the end")))
|
||||
} else {
|
||||
warn!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?} not delivered to the end, but maybe in the future",
|
||||
@@ -404,9 +401,9 @@ impl Stream for GapFill {
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?}",
|
||||
self.dbgname, self.last_bin_ts2
|
||||
);
|
||||
Ready(Some(Err(::err::Error::from_string(
|
||||
Ready(Some(sitem_err_from_string(
|
||||
"finer input delivered nothing, received nothing at all so far",
|
||||
))))
|
||||
)))
|
||||
} else {
|
||||
warn!(
|
||||
"{} inp_finer Ready(None) last_bin_ts2 {:?}",
|
||||
@@ -420,14 +417,14 @@ impl Stream for GapFill {
|
||||
} else if let Some(x) = self.inp_buf.take() {
|
||||
match self.as_mut().handle_bins_finer(x) {
|
||||
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
}
|
||||
} else if let Some(inp) = self.inp.as_mut() {
|
||||
match inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) {
|
||||
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
},
|
||||
StreamItem::DataItem(RangeCompletableItem::RangeComplete) => {
|
||||
self.inp_range_final = true;
|
||||
@@ -436,7 +433,7 @@ impl Stream for GapFill {
|
||||
StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))),
|
||||
StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))),
|
||||
},
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Ready(Some(Err(e))) => Ready(Some(sitem_err_from_string(e))),
|
||||
Ready(None) => {
|
||||
self.inp = None;
|
||||
// TODO assert that we have emitted up to the requested range.
|
||||
@@ -455,7 +452,7 @@ impl Stream for GapFill {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
}
|
||||
} else {
|
||||
debug!("{} received everything", self.dbgname);
|
||||
@@ -471,7 +468,7 @@ impl Stream for GapFill {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
Err(e) => Ready(Some(sitem_err_from_string(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::timebin::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::CacheReadProvider;
|
||||
use crate::transform::build_merged_event_transform;
|
||||
use err::Error;
|
||||
use futures_util::future::BoxFuture;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -36,6 +35,14 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "TimebinnedJson")]
|
||||
pub enum Error {
|
||||
Query(#[from] query::api4::binned::Error),
|
||||
FromLayers(#[from] super::timebin::fromlayers::Error),
|
||||
Transform(#[from] super::transform::Error),
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl 'u + Send + Stream<Item = R> {
|
||||
stream
|
||||
@@ -249,8 +256,7 @@ async fn timebinned_stream(
|
||||
bin_len_layers,
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
)
|
||||
.map_err(Error::from_string)?;
|
||||
)?;
|
||||
let stream = stream.map(|item| {
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
on_sitemty_data!(item, |mut x: Box<dyn BinningggContainerBinsDyn>| {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
@@ -19,6 +18,10 @@ use query::transform::TimeBinningTransformQuery;
|
||||
use query::transform::TransformQuery;
|
||||
use std::pin::Pin;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Transform")]
|
||||
pub enum Error {}
|
||||
|
||||
pub fn build_event_transform(tr: &TransformQuery) -> Result<TransformEvent, Error> {
|
||||
let trev = tr.get_tr_event();
|
||||
match trev {
|
||||
|
||||
Reference in New Issue
Block a user