From 8495853f8eb05ca609c767775146f1fd9e8c1b78 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 19 Jan 2023 20:05:25 +0100 Subject: [PATCH] Imagebuffer reads, time binning --- disk/src/disk.rs | 67 +++++++++++++++++++-- disk/src/eventblobs.rs | 6 +- disk/src/raw/conn.rs | 1 + httpret/src/api1.rs | 84 +++++++++++++++------------ httpret/src/api4/binned.rs | 13 ++++- httpret/src/api4/status.rs | 4 +- httpret/src/channel_status.rs | 36 ++++++++---- httpret/src/events.rs | 5 +- httpret/src/proxy.rs | 2 +- httpret/src/proxy/api4.rs | 4 +- httpret/src/proxy/api4/caioclookup.rs | 54 +++++++++++++++++ items_0/src/items_0.rs | 2 + items_2/src/binsdim0.rs | 42 +++++++++----- items_2/src/binsxbindim0.rs | 42 +++++++++----- items_2/src/channelevents.rs | 7 +++ items_2/src/eventsdim0.rs | 28 +++++---- items_2/src/eventsxbindim0.rs | 5 +- items_2/src/items_2.rs | 2 +- items_2/src/timebin.rs | 6 +- netpod/src/netpod.rs | 7 ++- netpod/src/query/api1.rs | 4 ++ scyllaconn/src/status.rs | 24 ++++---- streams/src/collect.rs | 2 +- streams/src/plaineventsjson.rs | 1 + streams/src/timebin.rs | 17 +++++- streams/src/timebinnedjson.rs | 3 +- 26 files changed, 341 insertions(+), 127 deletions(-) create mode 100644 httpret/src/proxy/api4/caioclookup.rs diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 3c02c17..a0370b1 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -17,11 +17,12 @@ pub mod read3; pub mod read4; pub mod streamlog; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; +use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::{FutureExt, TryFutureExt}; +use futures_util::{FutureExt, StreamExt, TryFutureExt}; use netpod::log::*; use netpod::ReadSys; use netpod::{ChannelConfig, DiskIoTune, Node, Shape}; @@ -32,12 +33,15 @@ use std::mem; use std::os::unix::prelude::AsRawFd; use std::path::PathBuf; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use std::time::Instant; use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use streams::filechunkread::FileChunkRead; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf}; +use tokio::fs::File; +use tokio::fs::OpenOptions; +use tokio::io::ReadBuf; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt}; use tokio::sync::mpsc; // TODO transform this into a self-test or remove. @@ -230,6 +234,55 @@ impl Stream for FileContentStream { } } +fn start_read5(file: File, tx: async_channel::Sender>) -> Result<(), Error> { + let fut = async move { + info!("start_read5 BEGIN"); + let mut file = file; + loop { + let mut buf = BytesMut::new(); + buf.resize(1024 * 256, 0); + match file.read(&mut buf).await { + Ok(n) => { + buf.truncate(n); + let item = FileChunkRead::with_buf(buf); + match tx.send(Ok(item)).await { + Ok(()) => {} + Err(_e) => break, + } + } + Err(e) => match tx.send(Err(e.into())).await { + Ok(()) => {} + Err(_e) => break, + }, + } + } + info!("start_read5 DONE"); + }; + tokio::task::spawn(fut); + Ok(()) +} + +pub struct FileContentStream5 { + rx: async_channel::Receiver>, +} + +impl FileContentStream5 { + pub fn new(file: File, _disk_io_tune: DiskIoTune) -> Result { + let (tx, rx) = async_channel::bounded(32); + start_read5(file, tx)?; + let ret = Self { rx }; + Ok(ret) + } +} + +impl Stream for FileContentStream5 { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.rx.poll_next_unpin(cx) + } +} + enum FCS2 { Idle, Reading( @@ -622,6 +675,10 @@ pub fn file_content_stream( let s = FileContentStream4::new(file, disk_io_tune); Box::pin(s) as _ } + ReadSys::Read5 => { + let s = FileContentStream5::new(file, disk_io_tune).unwrap(); + Box::pin(s) as _ + } } } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 25e6dcb..87880ca 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -47,6 +47,7 @@ impl EventChunkerMultifile { expand: bool, do_decompress: bool, ) -> Self { + info!("EventChunkerMultifile do_decompress {do_decompress}"); let file_chan = if expand { open_expanded_files(&range, &channel_config, node) } else { @@ -186,8 +187,11 @@ impl Stream for EventChunkerMultifile { let item = LogItem::quick(Level::INFO, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { - let msg = format!("handle OFS MERGED {:?}", ofs); + let msg = format!("handle OFS MERGED timebin {}", ofs.timebin); info!("{}", msg); + for x in &ofs.files { + info!(" path {:?}", x.path); + } let item = LogItem::quick(Level::INFO, msg); let mut chunkers = vec![]; for of in ofs.files { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index fe94d40..fe221df 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -353,6 +353,7 @@ pub async fn make_event_blobs_pipe( evq: &PlainEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { + info!("make_event_blobs_pipe {evq:?}"); if false { match dbconn::channel_exists(evq.channel(), &node_config).await { Ok(_) => (), diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 5da06df..6c440b4 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -611,7 +611,6 @@ pub struct DataApiPython3DataStream { chan_stream: Option> + Send>>>, config_fut: Option> + Send>>>, disk_io_tune: DiskIoTune, - #[allow(unused)] do_decompress: bool, #[allow(unused)] event_count: u64, @@ -696,12 +695,13 @@ impl DataApiPython3DataStream { compression, }; let h = serde_json::to_string(&head)?; - debug!("sending channel header {}", h); + info!("sending channel header {}", h); let l1 = 1 + h.as_bytes().len() as u32; d.put_u32(l1); d.put_u8(0); - debug!("header frame byte len {}", 4 + 1 + h.as_bytes().len()); + info!("header frame byte len {}", 4 + 1 + h.as_bytes().len()); d.extend_from_slice(h.as_bytes()); + d.put_u32(l1); *header_out = true; } match &b.shapes[i1] { @@ -712,6 +712,7 @@ impl DataApiPython3DataStream { d.put_u64(b.tss[i1]); d.put_u64(b.pulses[i1]); d.put_slice(&b.blobs[i1]); + d.put_u32(l1); } } *count_events += 1; @@ -806,7 +807,7 @@ impl Stream for DataApiPython3DataStream { evq.channel().clone(), &entry, evq.agg_kind().need_expand(), - true, + self.do_decompress, event_chunker_conf, self.disk_io_tune.clone(), &self.node_config, @@ -937,8 +938,16 @@ impl Api1EventsBinaryHandler { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); let body_data = hyper::body::to_bytes(body).await?; - let qu: Api1Query = match serde_json::from_slice(&body_data) { - Ok(qu) => qu, + if body_data.len() < 512 && body_data.first() == Some(&"{".as_bytes()[0]) { + info!("request body_data string: {}", String::from_utf8_lossy(&body_data)); + } + let qu = match serde_json::from_slice::(&body_data) { + Ok(mut qu) => { + if node_config.node_config.cluster.is_central_storage { + qu.set_decompress(false); + } + qu + } Err(e) => { error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..])); error!("can not parse: {e}"); @@ -976,42 +985,43 @@ impl Api1EventsBinaryHandler { trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date); //let url = Url::parse(&format!("dummy:{}", req.uri()))?; //let query = PlainEventsBinaryQuery::from_url(&url)?; - if accept != APP_OCTET && accept != ACCEPT_ALL { + if accept.contains(APP_OCTET) || accept.contains(ACCEPT_ALL) { + let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64; + let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64; + let range = NanoRange { beg, end }; + // TODO check for valid given backend name: + let backend = &node_config.node_config.cluster.backend; + let chans = qu + .channels() + .iter() + .map(|ch| Channel { + backend: backend.into(), + name: ch.name().into(), + series: None, + }) + .collect(); + // TODO use a better stream protocol with built-in error delivery. + let status_id = super::status_board()?.new_status_id(); + let s = DataApiPython3DataStream::new( + range.clone(), + chans, + qu.disk_io_tune().clone(), + qu.decompress(), + qu.events_max().unwrap_or(u64::MAX), + status_id.clone(), + node_config.clone(), + ); + let s = s.instrument(span); + let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")); + let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id); + let ret = ret.body(body)?; + Ok(ret) + } else { // TODO set the public error code and message and return Err(e). let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); error!("{e:?}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64; - let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64; - let range = NanoRange { beg, end }; - // TODO check for valid given backend name: - let backend = &node_config.node_config.cluster.backend; - let chans = qu - .channels() - .iter() - .map(|ch| Channel { - backend: backend.into(), - name: ch.name().into(), - series: None, - }) - .collect(); - // TODO use a better stream protocol with built-in error delivery. - let status_id = super::status_board()?.new_status_id(); - let s = DataApiPython3DataStream::new( - range.clone(), - chans, - qu.disk_io_tune().clone(), - qu.decompress(), - qu.events_max().unwrap_or(u64::MAX), - status_id.clone(), - node_config.clone(), - ); - let s = s.instrument(span); - let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")); - let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id); - let ret = ret.body(body)?; - Ok(ret) } } diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 9db0347..8a6e5dc 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -18,6 +18,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache debug!("httpret plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("binned_json: {e:?}"); let msg = format!("can not parse query: {}", e.msg()); e.add_public_msg(msg) })?; @@ -56,6 +57,13 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result Ok(ret), - Err(e) => Ok(e.to_public_response()), + Err(e) => { + warn!("BinnedHandler handle sees: {e}"); + Ok(e.to_public_response()) + } } } } diff --git a/httpret/src/api4/status.rs b/httpret/src/api4/status.rs index 3429e65..b2f816d 100644 --- a/httpret/src/api4/status.rs +++ b/httpret/src/api4/status.rs @@ -44,7 +44,7 @@ impl StatusNodesRecursive { let res = match res { Ok(res) => res, Err(e) => { - let e = Error::from(e); + let e = Error::from(e).add_public_msg("see timeout"); return Ok(crate::bodystream::ToPublicResponse::to_public_response(&e)); } }; @@ -55,7 +55,7 @@ impl StatusNodesRecursive { Ok(ret) } Err(e) => { - error!("{e}"); + error!("StatusNodesRecursive sees: {e}"); let ret = crate::bodystream::ToPublicResponse::to_public_response(&e); Ok(ret) } diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 12103f7..a773e71 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -2,11 +2,18 @@ use crate::bodystream::response; use crate::err::Error; use crate::ReqCtx; use futures_util::StreamExt; -use http::{Method, Request, Response, StatusCode}; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::Body; use items_2::channelevents::ConnStatusEvent; +use netpod::log::*; use netpod::query::ChannelStateEventsQuery; -use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON}; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; use url::Url; pub struct ConnectionStatusEvents {} @@ -32,7 +39,7 @@ impl ConnectionStatusEvents { .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelStateEventsQuery::from_url(&url)?; match self.fetch_data(&q, node_config).await { @@ -40,8 +47,11 @@ impl ConnectionStatusEvents { let body = Body::from(serde_json::to_vec(&k)?); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("{:?}", e.public_msg())))?), + Err(e) => { + error!("{e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?) + } } } else { Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) @@ -61,7 +71,7 @@ impl ConnectionStatusEvents { .cluster .scylla .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; let series = chconf.series; @@ -100,7 +110,7 @@ impl ChannelStatusEvents { .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { + if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelStateEventsQuery::from_url(&url)?; match self.fetch_data(&q, node_config).await { @@ -108,8 +118,11 @@ impl ChannelStatusEvents { let body = Body::from(serde_json::to_vec(&k)?); Ok(response(StatusCode::OK).body(body)?) } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("{:?}", e.public_msg())))?), + Err(e) => { + error!("{e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?) + } } } else { Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) @@ -129,13 +142,12 @@ impl ChannelStatusEvents { .cluster .scylla .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; - let series = chconf.series; let do_one_before_range = true; let mut stream = - scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy); + scyllaconn::status::StatusStreamScylla::new(chconf.series, q.range().clone(), do_one_before_range, scy); let mut ret = Vec::new(); while let Some(item) = stream.next().await { let item = item?; diff --git a/httpret/src/events.rs b/httpret/src/events.rs index aa90377..9976391 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -28,7 +28,10 @@ impl EventsHandler { } match plain_events(req, node_config).await { Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), + Err(e) => { + error!("EventsHandler sees {e}"); + Ok(e.to_public_response()) + } } } } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 5ddc820..c63011a 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -513,7 +513,7 @@ where return Ok(res); } Err(e) => { - warn!("{e}"); + warn!("FT sees: {e}"); let res = crate::bodystream::ToPublicResponse::to_public_response(&e); return Ok(res); } diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index c9bbd39..4a5a3be 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -1,3 +1,5 @@ +pub mod caioclookup; + use crate::bodystream::ToPublicResponse; use crate::err::Error; use crate::gather::gather_get_json_generic; @@ -169,7 +171,7 @@ impl StatusNodesRecursive { Ok(ret) } Err(e) => { - error!("{e}"); + error!("StatusNodesRecursive sees: {e}"); let ret = crate::bodystream::ToPublicResponse::to_public_response(&e); Ok(ret) } diff --git a/httpret/src/proxy/api4/caioclookup.rs b/httpret/src/proxy/api4/caioclookup.rs new file mode 100644 index 0000000..105c654 --- /dev/null +++ b/httpret/src/proxy/api4/caioclookup.rs @@ -0,0 +1,54 @@ +use crate::bodystream::response; +use crate::err::Error; +use crate::ReqCtx; +use http::Request; +use http::Response; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::ProxyConfig; + +pub struct CaIocLookup {} + +impl CaIocLookup { + fn path() -> &'static str { + "/api/4/channel-access/search/addr" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path() == Self::path() { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Request, + ctx: &ReqCtx, + node_config: &ProxyConfig, + ) -> Result, Error> { + match self.search(req, ctx, node_config).await { + Ok(status) => { + let body = serde_json::to_vec(&status)?; + let ret = response(StatusCode::OK).body(Body::from(body))?; + Ok(ret) + } + Err(e) => { + error!("sees: {e}"); + let ret = crate::bodystream::ToPublicResponse::to_public_response(&e); + Ok(ret) + } + } + } + + async fn search( + &self, + _req: Request, + _ctx: &ReqCtx, + _proxy_config: &ProxyConfig, + ) -> Result, Error> { + Ok(None) + } +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index f351fe3..d27d5bd 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -110,6 +110,8 @@ pub trait TimeBinner: Send { fn cycle(&mut self); fn set_range_complete(&mut self); + + fn empty(&self) -> Box; } // TODO remove the Any bound. Factor out into custom AsAny trait. diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 410c7c2..1c0d2aa 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -256,7 +256,7 @@ pub struct BinsDim0CollectedResult { #[serde(rename = "avgs")] avgs: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] - finalised_range: bool, + range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")] @@ -309,7 +309,7 @@ impl BinsDim0CollectedResult { } pub fn range_final(&self) -> bool { - self.finalised_range + self.range_final } pub fn missing_bins(&self) -> u32 { @@ -343,7 +343,7 @@ impl ToJsonResult for BinsDim0CollectedResult { #[derive(Debug)] pub struct BinsDim0Collector { timed_out: bool, - range_complete: bool, + range_final: bool, vals: BinsDim0, } @@ -351,7 +351,7 @@ impl BinsDim0Collector { pub fn new() -> Self { Self { timed_out: false, - range_complete: false, + range_final: false, vals: BinsDim0::::empty(), } } @@ -379,7 +379,7 @@ impl CollectorType for BinsDim0Collector { } fn set_range_complete(&mut self) { - self.range_complete = true; + self.range_final = true; } fn set_timed_out(&mut self) { @@ -393,16 +393,23 @@ impl CollectorType for BinsDim0Collector { 0 }; let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { - match self.vals.ts2s.back() { - Some(&k) => { - let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); - let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); - (missing_bins, Some(continue_at), Some(finished_at)) + let (missing_bins, continue_at, finished_at) = if self.range_final { + if bin_count < bin_count_exp { + match self.vals.ts2s.back() { + Some(&k) => { + let missing_bins = bin_count_exp - bin_count; + let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + (missing_bins, Some(continue_at), Some(finished_at)) + } + None => { + warn!("can not determine continue-at parameters"); + (0, None, None) + } } - None => Err(Error::with_msg("partial_content but no bin in result"))?, + } else { + (0, None, None) } } else { (0, None, None) @@ -429,7 +436,7 @@ impl CollectorType for BinsDim0Collector { mins, maxs, avgs, - finalised_range: self.range_complete, + range_final: self.range_final, timed_out: self.timed_out, missing_bins, continue_at, @@ -769,6 +776,11 @@ impl TimeBinner for BinsDim0TimeBinner { } fn set_range_complete(&mut self) {} + + fn empty(&self) -> Box { + let ret = as TimeBinnableTypeAggregator>::Output::empty(); + Box::new(ret) + } } impl TimeBinned for BinsDim0 { diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 18f4b0a..f6990d4 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -261,7 +261,7 @@ pub struct BinsXbinDim0CollectedResult { #[serde(rename = "avgs")] avgs: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")] - finalised_range: bool, + range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, #[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")] @@ -314,7 +314,7 @@ impl BinsXbinDim0CollectedResult { } pub fn range_final(&self) -> bool { - self.finalised_range + self.range_final } pub fn missing_bins(&self) -> u32 { @@ -344,7 +344,7 @@ impl ToJsonResult for BinsXbinDim0CollectedResult { #[derive(Debug)] pub struct BinsXbinDim0Collector { timed_out: bool, - range_complete: bool, + range_final: bool, vals: BinsXbinDim0, } @@ -352,7 +352,7 @@ impl BinsXbinDim0Collector { pub fn new() -> Self { Self { timed_out: false, - range_complete: false, + range_final: false, vals: BinsXbinDim0::::empty(), } } @@ -380,7 +380,7 @@ impl CollectorType for BinsXbinDim0Collector { } fn set_range_complete(&mut self) { - self.range_complete = true; + self.range_final = true; } fn set_timed_out(&mut self) { @@ -394,16 +394,23 @@ impl CollectorType for BinsXbinDim0Collector { 0 }; let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { - match self.vals.ts2s.back() { - Some(&k) => { - let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); - let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); - (missing_bins, Some(continue_at), Some(finished_at)) + let (missing_bins, continue_at, finished_at) = if self.range_final { + if bin_count < bin_count_exp { + match self.vals.ts2s.back() { + Some(&k) => { + let missing_bins = bin_count_exp - bin_count; + let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + (missing_bins, Some(continue_at), Some(finished_at)) + } + None => { + warn!("can not determine continue-at parameters"); + (0, None, None) + } } - None => Err(Error::with_msg("partial_content but no bin in result"))?, + } else { + (0, None, None) } } else { (0, None, None) @@ -430,7 +437,7 @@ impl CollectorType for BinsXbinDim0Collector { mins, maxs, avgs, - finalised_range: self.range_complete, + range_final: self.range_final, timed_out: self.timed_out, missing_bins, continue_at, @@ -770,6 +777,11 @@ impl TimeBinner for BinsXbinDim0TimeBinner { } fn set_range_complete(&mut self) {} + + fn empty(&self) -> Box { + let ret = as TimeBinnableTypeAggregator>::Output::empty(); + Box::new(ret) + } } impl TimeBinned for BinsXbinDim0 { diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 980e541..827fb29 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -576,6 +576,13 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { None => (), } } + + fn empty(&self) -> Option { + match self.binner.as_ref() { + Some(binner) => Some(binner.empty()), + None => None, + } + } } impl crate::timebin::TimeBinnable for ChannelEvents { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 19afaa9..5898454 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -13,6 +13,12 @@ use std::any::Any; use std::collections::VecDeque; use std::{fmt, mem}; +#[allow(unused)] +macro_rules! trace2 { + (EN$($arg:tt)*) => (); + ($($arg:tt)*) => (trace!($($arg)*)); +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0 { pub tss: VecDeque, @@ -286,10 +292,7 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector TimeBinner for EventsDim0TimeBinner { fn ingest(&mut self, item: &dyn TimeBinnable) { let self_name = std::any::type_name::(); - if true { - trace!( - "TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------", - self.edges.iter().take(2).collect::>(), - item - ); - } + trace2!( + "TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------", + self.edges.iter().take(2).collect::>(), + item + ); if item.len() == 0 { // Return already here, RangeOverlapInfo would not give much sense. return; @@ -949,6 +950,11 @@ impl TimeBinner for EventsDim0TimeBinner { fn set_range_complete(&mut self) { self.range_complete = true; } + + fn empty(&self) -> Box { + let ret = as TimeBinnableTypeAggregator>::Output::empty(); + Box::new(ret) + } } // TODO remove this struct? diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index efcf2eb..c4215cd 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -451,10 +451,7 @@ where if let Some(range) = &range { Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC)) } else { - // TODO tricky: should yield again the original range begin? Leads to recursion. - // Range begin plus delta? - // Anyway, we don't have the range begin here. - warn!("timed out without any result, can not yield a continue-at"); + warn!("can not determine continue-at parameters"); None } } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 579cf36..18bde2e 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -191,7 +191,7 @@ impl crate::merger::Mergeable for Box { } // TODO rename to `Typed` -pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo { +pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo + Empty { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; diff --git a/items_2/src/timebin.rs b/items_2/src/timebin.rs index 21d0550..6e2af73 100644 --- a/items_2/src/timebin.rs +++ b/items_2/src/timebin.rs @@ -1,8 +1,8 @@ use std::fmt; pub trait TimeBinner: fmt::Debug + Unpin { - type Input; - type Output; + type Input: fmt::Debug; + type Output: fmt::Debug; fn ingest(&mut self, item: &mut Self::Input); @@ -20,6 +20,8 @@ pub trait TimeBinner: fmt::Debug + Unpin { /// to `push_in_progress` did not change the result count, as long as edges are left. /// The next call to `Self::bins_ready_count` must return one higher count than before. fn cycle(&mut self); + + fn empty(&self) -> Option; } pub trait TimeBinnable: fmt::Debug + Sized { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index cab9b62..65f1b97 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1030,7 +1030,7 @@ impl Shape { pub fn to_scylla_vec(&self) -> Vec { use Shape::*; match self { - Scalar => vec![], + Scalar => Vec::new(), Wave(n) => vec![*n as i32], Image(n, m) => vec![*n as i32, *m as i32], } @@ -1870,11 +1870,12 @@ pub enum ReadSys { Read2, Read3, Read4, + Read5, } impl ReadSys { pub fn default() -> Self { - Self::TokioAsyncRead + Self::Read5 } } @@ -1888,6 +1889,8 @@ impl From<&str> for ReadSys { Self::Read3 } else if k == "Read4" { Self::Read4 + } else if k == "Read5" { + Self::Read5 } else { Self::default() } diff --git a/netpod/src/query/api1.rs b/netpod/src/query/api1.rs index 76fa6c0..74f9c59 100644 --- a/netpod/src/query/api1.rs +++ b/netpod/src/query/api1.rs @@ -286,6 +286,10 @@ impl Api1Query { pub fn events_max(&self) -> Option { self.events_max } + + pub fn set_decompress(&mut self, v: bool) { + self.decompress = v; + } } #[test] diff --git a/scyllaconn/src/status.rs b/scyllaconn/src/status.rs index f2fc4f6..32b8379 100644 --- a/scyllaconn/src/status.rs +++ b/scyllaconn/src/status.rs @@ -41,7 +41,7 @@ async fn read_next_status_events( ); // TODO use prepared! let cql = concat!( - "select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + "select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" ); scy.query( cql, @@ -61,7 +61,7 @@ async fn read_next_status_events( ); // TODO use prepared! let cql = concat!( - "select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + "select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" ); scy.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) .await @@ -69,11 +69,10 @@ async fn read_next_status_events( }; let mut last_before = None; let mut ret = VecDeque::new(); - for row in res.rows_typed_or_empty::<(i64, i64, i32)>() { + for row in res.rows_typed_or_empty::<(i64, i32)>() { let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; - let _pulse = row.1 as u64; - let kind = row.2 as u32; + let kind = row.1 as u32; // from netfetch::store::ChannelStatus let ev = ConnStatusEvent { ts, @@ -132,9 +131,10 @@ impl ReadValues { fn next(&mut self) -> bool { if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 0); + self.fut = self.make_fut(ts_msp); true } else { + info!("no more msp"); false } } @@ -142,8 +142,8 @@ impl ReadValues { fn make_fut( &mut self, ts_msp: u64, - _has_more_msp: bool, ) -> Pin, Error>> + Send>> { + info!("make fut for {ts_msp}"); let fut = read_next_status_events( self.series, ts_msp, @@ -168,7 +168,6 @@ pub struct StatusStreamScylla { range: NanoRange, do_one_before_range: bool, scy: Arc, - ts_msps: VecDeque, outbuf: VecDeque, } @@ -180,7 +179,6 @@ impl StatusStreamScylla { range, do_one_before_range, scy, - ts_msps: VecDeque::new(), outbuf: VecDeque::new(), } } @@ -202,13 +200,14 @@ impl Stream for StatusStreamScylla { let mut ts_msps = VecDeque::new(); let mut ts = self.range.beg / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; while ts < self.range.end { + info!("Use ts {ts}"); ts_msps.push_back(ts); ts += CONNECTION_STATUS_DIV; } let st = ReadValues::new( self.series, self.range.clone(), - self.ts_msps.clone(), + ts_msps, true, self.do_one_before_range, self.scy.clone(), @@ -227,7 +226,10 @@ impl Stream for StatusStreamScylla { } continue; } - Ready(Err(e)) => Ready(Some(Err(e))), + Ready(Err(e)) => { + error!("{e}"); + Ready(Some(Err(e))) + } Pending => Pending, }, FrState::Done => Ready(None), diff --git a/streams/src/collect.rs b/streams/src/collect.rs index bf196a3..6a3820b 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -57,7 +57,7 @@ where break; } }; - trace!("collect_in_span see item"); + trace!("collect_in_span see item {item:?}"); match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 9374b49..4c7d072 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -36,6 +36,7 @@ pub async fn plain_events_json( let stream = inp0.chain(inp1).chain(inp2); stream }; + netpod::log::info!("plain_events_json with empty item {empty:?}"); let stream = { items_2::merger::Merger::new(inps, 1) }; let stream = stream::iter([empty]).chain(stream); let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index 1a4d938..97fca8d 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -81,11 +81,14 @@ where } fn process_item(&mut self, mut item: T) -> () { + trace!("process_item {item:?}"); if self.binner.is_none() { + trace!("process_item call time_binner_new"); let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight); self.binner = Some(binner); } let binner = self.binner.as_mut().unwrap(); + trace!("process_item call binner ingest"); binner.ingest(&mut item); } } @@ -198,9 +201,17 @@ where Ready(Some(Err(e))) } } else { - trace2!("no bins ready yet"); - self.done_data = true; - continue; + if let Some(bins) = binner.empty() { + trace!("at end of stream, bin count zero, return {bins:?}"); + self.done_data = true; + Ready(Some(sitem_data(bins))) + } else { + error!("at the end, no bins, can not get empty"); + self.done_data = true; + let e = Error::with_msg_no_trace(format!("no bins")) + .add_public_msg(format!("unable to produce bins")); + Ready(Some(Err(e))) + } } } else { trace2!("input stream finished, still no binner"); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index a28f1b2..eae8c74 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -35,7 +35,8 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu ); let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: - let stream = { items_2::merger::Merger::new(inps, 128) }; + netpod::log::info!("timebinned_json with empty item {empty:?}"); + let stream = items_2::merger::Merger::new(inps, 128); let stream = stream::iter([empty]).chain(stream); let stream = Box::pin(stream); let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);