diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 4aa630a..245bf77 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -151,7 +151,9 @@ fn make_keepalive() -> Result { pub struct FramedBytesToChannelEventsStream { inp: S, + #[allow(unused)] scalar_type: ScalarType, + #[allow(unused)] shape: Shape, buf: BytesMut, } @@ -222,6 +224,8 @@ impl FramedBytesToChannelEventsStream { } else { None }; + debug!("TODO channel events discarded"); + drop(item); let item = None; let item = if let Some(x) = item { Some(x) diff --git a/src/collect.rs b/src/collect.rs index b392041..b5786bc 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -11,10 +11,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; -use items_0::WithLen; use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use netpod::BinnedRangeEnum; use netpod::DiskStats; use std::pin::Pin; use std::task::Context; @@ -56,8 +53,6 @@ pub struct Collect { inp: Pin> + Send>>, events_max: u64, bytes_max: u64, - range: Option, - binrange: Option, collector: Option>, range_final: bool, timeout: bool, @@ -74,16 +69,12 @@ where deadline: Instant, events_max: u64, bytes_max: u64, - range: Option, - binrange: Option, timeout_provider: Box, ) -> Self { Self { inp, events_max, bytes_max, - range, - binrange, collector: None, range_final: false, timeout: false, @@ -114,7 +105,7 @@ where info!("reached events_max {} / {}", coll.len(), self.events_max); self.done_input = true; } - if coll.byte_estimate() >= self.bytes_max { + if coll.byte_estimate() as u64 >= self.bytes_max { info!( "reached bytes_max {} / {}", coll.byte_estimate(), diff --git a/src/frames/inmem.rs b/src/frames/inmem.rs index ba688f0..0274781 100644 --- a/src/frames/inmem.rs +++ b/src/frames/inmem.rs @@ -1,4 +1,4 @@ -use crate::log::*; +use crate::log; use crate::slidebuf::SlideBuf; use bytes::Bytes; use futures_util::pin_mut; @@ -16,25 +16,26 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "InMem")] -pub enum Error { - Input, - Slidebuf(#[from] crate::slidebuf::Error), - IO(#[from] std::io::Error), - LessThanNeedMin, - LessThanHeader, - HugeFrame(u32), - BadMagic(u32), - TryFromSlice(#[from] std::array::TryFromSliceError), - BadCrc, - EnoughInputNothingParsed, - InMemParse(#[from] items_2::inmem::Error), -} +autoerr::create_error_v1!( + name(Error, "InMem"), + enum variants { + Input, + Slidebuf(#[from] crate::slidebuf::Error), + IO(#[from] std::io::Error), + LessThanNeedMin, + LessThanHeader, + HugeFrame(u32), + BadMagic(u32), + TryFromSlice(#[from] std::array::TryFromSliceError), + BadCrc, + EnoughInputNothingParsed, + InMemParse(#[from] items_2::inmem::Error), + }, +); pub type BoxedBytesStream = Pin> + Send>>; -macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); } +macro_rules! trace2 { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } /// Interprets a byte stream as length-delimited frames. /// @@ -84,7 +85,16 @@ where self.buf.wadv(x.len())?; Ready(Ok(x.len())) } - Err(e) => Ready(Err(e.into())), + Err(e) => { + log::error!( + "{} {:?} inp len {} need_min {}", + e, + self.buf, + x.len(), + self.need_min + ); + Ready(Err(e.into())) + } }, Ready(Some(Err(_e))) => Ready(Err(Error::Input)), Ready(None) => Ready(Ok(0)), @@ -127,7 +137,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = span!(Level::INFO, "InMemRd"); + let span = log::span!(log::Level::INFO, "InMemRd"); let _spanguard = span.enter(); loop { break if self.complete { @@ -171,9 +181,11 @@ where } } Ready(Err(e)) => { - error!( + log::error!( "poll_upstream need_min {} buf {:?} {:?}", - self.need_min, self.buf, e + self.need_min, + self.buf, + e ); self.done = true; Ready(Some(Err(sitem_err2_from_string(e)))) diff --git a/src/plaineventsjson.rs b/src/plaineventsjson.rs index b028ed5..383443d 100644 --- a/src/plaineventsjson.rs +++ b/src/plaineventsjson.rs @@ -6,20 +6,13 @@ use crate::json_stream::events_stream_to_json_stream; use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; use crate::streamtimeout::StreamTimeout2; -use crate::streamtimeout::TimeoutableStream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use futures_util::StreamExt; -use items_0::collect_s::CollectableDyn; -use items_0::on_sitemty_data; use items_2::jsonbytes::JsonBytes; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; -use netpod::HasTimeout; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; -use serde_json::Value as JsonValue; -use std::time::Duration; use std::time::Instant; autoerr::create_error_v1!( @@ -52,8 +45,6 @@ pub async fn plain_events_json( deadline, evq.events_max(), evq.bytes_max(), - Some(evq.range().clone()), - None, timeout_provider, ) .await?; diff --git a/src/timebin/fromevents.rs b/src/timebin/fromevents.rs index 8821bf0..9d65d61 100644 --- a/src/timebin/fromevents.rs +++ b/src/timebin/fromevents.rs @@ -1,6 +1,6 @@ use super::cached::reader::EventsReadProvider; use crate::events::convertforbinning::ConvertForBinning; -use crate::log::*; +use crate::log; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; @@ -14,7 +14,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($($arg:expr),*) => ( if true { trace!($($arg),*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } autoerr::create_error_v1!( name(Error, "ReadingBinnedFromEvents"), diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index c752d8b..60c1f23 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -1,7 +1,7 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; use super::opts::BinningOptions; -use crate::log::*; +use crate::log; use crate::timebin::fromevents::BinnedFromEvents; use crate::timebin::gapfill::GapFill; use crate::timebin::grid::find_next_finer_bin_len; @@ -26,7 +26,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_init { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } autoerr::create_error_v1!( name(Error, "TimeBinnedFromLayers"), @@ -160,7 +160,7 @@ impl TimeBinnedFromLayers { let inp = futures_util::stream::iter([]); let ret = Self { inp: Box::pin(inp) }; trace_init!("{}::new setup nothing", Self::type_name()); - info!("bin from events disabled on user request"); + trace_init!("bin from events disabled on user request"); Ok(ret) } } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 73a7449..47cfe17 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -330,15 +330,7 @@ pub async fn timebinned_json( events_read_provider, ) .await?; - let collected = Collect::new( - stream, - deadline, - collect_max, - bytes_max, - None, - Some(binned_range), - timeout_provider, - ); + let collected = Collect::new(stream, deadline, collect_max, bytes_max, timeout_provider); let collected: BoxFuture<_> = Box::pin(collected); let collres = collected.await?; match collres {