Imagebuffer reads, time binning

This commit is contained in:
Dominik Werder
2023-01-19 20:05:25 +01:00
parent 9c68476626
commit 8495853f8e
26 changed files with 341 additions and 127 deletions

View File

@@ -17,11 +17,12 @@ pub mod read3;
pub mod read4;
pub mod streamlog;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use bytes::BytesMut;
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, TryFutureExt};
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use netpod::log::*;
use netpod::ReadSys;
use netpod::{ChannelConfig, DiskIoTune, Node, Shape};
@@ -32,12 +33,15 @@ use std::mem;
use std::os::unix::prelude::AsRawFd;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use streams::filechunkread::FileChunkRead;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf};
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::io::ReadBuf;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
use tokio::sync::mpsc;
// TODO transform this into a self-test or remove.
@@ -230,6 +234,55 @@ impl Stream for FileContentStream {
}
}
fn start_read5(file: File, tx: async_channel::Sender<Result<FileChunkRead, Error>>) -> Result<(), Error> {
let fut = async move {
info!("start_read5 BEGIN");
let mut file = file;
loop {
let mut buf = BytesMut::new();
buf.resize(1024 * 256, 0);
match file.read(&mut buf).await {
Ok(n) => {
buf.truncate(n);
let item = FileChunkRead::with_buf(buf);
match tx.send(Ok(item)).await {
Ok(()) => {}
Err(_e) => break,
}
}
Err(e) => match tx.send(Err(e.into())).await {
Ok(()) => {}
Err(_e) => break,
},
}
}
info!("start_read5 DONE");
};
tokio::task::spawn(fut);
Ok(())
}
pub struct FileContentStream5 {
rx: async_channel::Receiver<Result<FileChunkRead, Error>>,
}
impl FileContentStream5 {
pub fn new(file: File, _disk_io_tune: DiskIoTune) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(32);
start_read5(file, tx)?;
let ret = Self { rx };
Ok(ret)
}
}
impl Stream for FileContentStream5 {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.rx.poll_next_unpin(cx)
}
}
enum FCS2 {
Idle,
Reading(
@@ -622,6 +675,10 @@ pub fn file_content_stream(
let s = FileContentStream4::new(file, disk_io_tune);
Box::pin(s) as _
}
ReadSys::Read5 => {
let s = FileContentStream5::new(file, disk_io_tune).unwrap();
Box::pin(s) as _
}
}
}

View File

@@ -47,6 +47,7 @@ impl EventChunkerMultifile {
expand: bool,
do_decompress: bool,
) -> Self {
info!("EventChunkerMultifile do_decompress {do_decompress}");
let file_chan = if expand {
open_expanded_files(&range, &channel_config, node)
} else {
@@ -186,8 +187,11 @@ impl Stream for EventChunkerMultifile {
let item = LogItem::quick(Level::INFO, msg);
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let msg = format!("handle OFS MERGED {:?}", ofs);
let msg = format!("handle OFS MERGED timebin {}", ofs.timebin);
info!("{}", msg);
for x in &ofs.files {
info!(" path {:?}", x.path);
}
let item = LogItem::quick(Level::INFO, msg);
let mut chunkers = vec![];
for of in ofs.files {

View File

@@ -353,6 +353,7 @@ pub async fn make_event_blobs_pipe(
evq: &PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>, Error> {
info!("make_event_blobs_pipe {evq:?}");
if false {
match dbconn::channel_exists(evq.channel(), &node_config).await {
Ok(_) => (),

View File

@@ -611,7 +611,6 @@ pub struct DataApiPython3DataStream {
chan_stream: Option<Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>>,
config_fut: Option<Pin<Box<dyn Future<Output = Result<Config, Error>> + Send>>>,
disk_io_tune: DiskIoTune,
#[allow(unused)]
do_decompress: bool,
#[allow(unused)]
event_count: u64,
@@ -696,12 +695,13 @@ impl DataApiPython3DataStream {
compression,
};
let h = serde_json::to_string(&head)?;
debug!("sending channel header {}", h);
info!("sending channel header {}", h);
let l1 = 1 + h.as_bytes().len() as u32;
d.put_u32(l1);
d.put_u8(0);
debug!("header frame byte len {}", 4 + 1 + h.as_bytes().len());
info!("header frame byte len {}", 4 + 1 + h.as_bytes().len());
d.extend_from_slice(h.as_bytes());
d.put_u32(l1);
*header_out = true;
}
match &b.shapes[i1] {
@@ -712,6 +712,7 @@ impl DataApiPython3DataStream {
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
d.put_u32(l1);
}
}
*count_events += 1;
@@ -806,7 +807,7 @@ impl Stream for DataApiPython3DataStream {
evq.channel().clone(),
&entry,
evq.agg_kind().need_expand(),
true,
self.do_decompress,
event_chunker_conf,
self.disk_io_tune.clone(),
&self.node_config,
@@ -937,8 +938,16 @@ impl Api1EventsBinaryHandler {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
let body_data = hyper::body::to_bytes(body).await?;
let qu: Api1Query = match serde_json::from_slice(&body_data) {
Ok(qu) => qu,
if body_data.len() < 512 && body_data.first() == Some(&"{".as_bytes()[0]) {
info!("request body_data string: {}", String::from_utf8_lossy(&body_data));
}
let qu = match serde_json::from_slice::<Api1Query>(&body_data) {
Ok(mut qu) => {
if node_config.node_config.cluster.is_central_storage {
qu.set_decompress(false);
}
qu
}
Err(e) => {
error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..]));
error!("can not parse: {e}");
@@ -976,42 +985,43 @@ impl Api1EventsBinaryHandler {
trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date);
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let query = PlainEventsBinaryQuery::from_url(&url)?;
if accept != APP_OCTET && accept != ACCEPT_ALL {
if accept.contains(APP_OCTET) || accept.contains(ACCEPT_ALL) {
let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64;
let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64;
let range = NanoRange { beg, end };
// TODO check for valid given backend name:
let backend = &node_config.node_config.cluster.backend;
let chans = qu
.channels()
.iter()
.map(|ch| Channel {
backend: backend.into(),
name: ch.name().into(),
series: None,
})
.collect();
// TODO use a better stream protocol with built-in error delivery.
let status_id = super::status_board()?.new_status_id();
let s = DataApiPython3DataStream::new(
range.clone(),
chans,
qu.disk_io_tune().clone(),
qu.decompress(),
qu.events_max().unwrap_or(u64::MAX),
status_id.clone(),
node_config.clone(),
);
let s = s.instrument(span);
let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler"));
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id);
let ret = ret.body(body)?;
Ok(ret)
} else {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
error!("{e:?}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64;
let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64;
let range = NanoRange { beg, end };
// TODO check for valid given backend name:
let backend = &node_config.node_config.cluster.backend;
let chans = qu
.channels()
.iter()
.map(|ch| Channel {
backend: backend.into(),
name: ch.name().into(),
series: None,
})
.collect();
// TODO use a better stream protocol with built-in error delivery.
let status_id = super::status_board()?.new_status_id();
let s = DataApiPython3DataStream::new(
range.clone(),
chans,
qu.disk_io_tune().clone(),
qu.decompress(),
qu.events_max().unwrap_or(u64::MAX),
status_id.clone(),
node_config.clone(),
);
let s = s.instrument(span);
let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler"));
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id);
let ret = ret.body(body)?;
Ok(ret)
}
}

View File

@@ -18,6 +18,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
debug!("httpret plain_events_json req: {:?}", req);
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_json: {e:?}");
let msg = format!("can not parse query: {}", e.msg());
e.add_public_msg(msg)
})?;
@@ -56,6 +57,13 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
.map_err(Error::from)
.map_err(|e| e.add_public_msg(format!("Can not parse query url")))?
};
if req
.uri()
.path_and_query()
.map_or(false, |x| x.as_str().contains("DOERR"))
{
Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?;
}
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
Ok(binned_json(url, req, node_config).await?)
} else if accept == APP_OCTET {
@@ -86,7 +94,10 @@ impl BinnedHandler {
}
match binned(req, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => Ok(e.to_public_response()),
Err(e) => {
warn!("BinnedHandler handle sees: {e}");
Ok(e.to_public_response())
}
}
}
}

View File

@@ -44,7 +44,7 @@ impl StatusNodesRecursive {
let res = match res {
Ok(res) => res,
Err(e) => {
let e = Error::from(e);
let e = Error::from(e).add_public_msg("see timeout");
return Ok(crate::bodystream::ToPublicResponse::to_public_response(&e));
}
};
@@ -55,7 +55,7 @@ impl StatusNodesRecursive {
Ok(ret)
}
Err(e) => {
error!("{e}");
error!("StatusNodesRecursive sees: {e}");
let ret = crate::bodystream::ToPublicResponse::to_public_response(&e);
Ok(ret)
}

View File

@@ -2,11 +2,18 @@ use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use futures_util::StreamExt;
use http::{Method, Request, Response, StatusCode};
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use items_2::channelevents::ConnStatusEvent;
use netpod::log::*;
use netpod::query::ChannelStateEventsQuery;
use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON};
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
use url::Url;
pub struct ConnectionStatusEvents {}
@@ -32,7 +39,7 @@ impl ConnectionStatusEvents {
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
@@ -40,8 +47,11 @@ impl ConnectionStatusEvents {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
@@ -61,7 +71,7 @@ impl ConnectionStatusEvents {
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?;
let series = chconf.series;
@@ -100,7 +110,7 @@ impl ChannelStatusEvents {
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
@@ -108,8 +118,11 @@ impl ChannelStatusEvents {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
@@ -129,13 +142,12 @@ impl ChannelStatusEvents {
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?;
let series = chconf.series;
let do_one_before_range = true;
let mut stream =
scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy);
scyllaconn::status::StatusStreamScylla::new(chconf.series, q.range().clone(), do_one_before_range, scy);
let mut ret = Vec::new();
while let Some(item) = stream.next().await {
let item = item?;

View File

@@ -28,7 +28,10 @@ impl EventsHandler {
}
match plain_events(req, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => Ok(e.to_public_response()),
Err(e) => {
error!("EventsHandler sees {e}");
Ok(e.to_public_response())
}
}
}
}

View File

@@ -513,7 +513,7 @@ where
return Ok(res);
}
Err(e) => {
warn!("{e}");
warn!("FT sees: {e}");
let res = crate::bodystream::ToPublicResponse::to_public_response(&e);
return Ok(res);
}

View File

@@ -1,3 +1,5 @@
pub mod caioclookup;
use crate::bodystream::ToPublicResponse;
use crate::err::Error;
use crate::gather::gather_get_json_generic;
@@ -169,7 +171,7 @@ impl StatusNodesRecursive {
Ok(ret)
}
Err(e) => {
error!("{e}");
error!("StatusNodesRecursive sees: {e}");
let ret = crate::bodystream::ToPublicResponse::to_public_response(&e);
Ok(ret)
}

View File

@@ -0,0 +1,54 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::ProxyConfig;
pub struct CaIocLookup {}
impl CaIocLookup {
fn path() -> &'static str {
"/api/4/channel-access/search/addr"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == Self::path() {
Some(Self {})
} else {
None
}
}
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
match self.search(req, ctx, node_config).await {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
Ok(ret)
}
Err(e) => {
error!("sees: {e}");
let ret = crate::bodystream::ToPublicResponse::to_public_response(&e);
Ok(ret)
}
}
}
async fn search(
&self,
_req: Request<Body>,
_ctx: &ReqCtx,
_proxy_config: &ProxyConfig,
) -> Result<Option<String>, Error> {
Ok(None)
}
}

View File

@@ -110,6 +110,8 @@ pub trait TimeBinner: Send {
fn cycle(&mut self);
fn set_range_complete(&mut self);
fn empty(&self) -> Box<dyn TimeBinned>;
}
// TODO remove the Any bound. Factor out into custom AsAny trait.

View File

@@ -256,7 +256,7 @@ pub struct BinsDim0CollectedResult<NTY> {
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
@@ -309,7 +309,7 @@ impl<NTY> BinsDim0CollectedResult<NTY> {
}
pub fn range_final(&self) -> bool {
self.finalised_range
self.range_final
}
pub fn missing_bins(&self) -> u32 {
@@ -343,7 +343,7 @@ impl<NTY: ScalarOps> ToJsonResult for BinsDim0CollectedResult<NTY> {
#[derive(Debug)]
pub struct BinsDim0Collector<NTY> {
timed_out: bool,
range_complete: bool,
range_final: bool,
vals: BinsDim0<NTY>,
}
@@ -351,7 +351,7 @@ impl<NTY> BinsDim0Collector<NTY> {
pub fn new() -> Self {
Self {
timed_out: false,
range_complete: false,
range_final: false,
vals: BinsDim0::<NTY>::empty(),
}
}
@@ -379,7 +379,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
}
fn set_range_complete(&mut self) {
self.range_complete = true;
self.range_final = true;
}
fn set_timed_out(&mut self) {
@@ -393,16 +393,23 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
0
};
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
let (missing_bins, continue_at, finished_at) = if self.range_final {
if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => {
warn!("can not determine continue-at parameters");
(0, None, None)
}
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
} else {
(0, None, None)
}
} else {
(0, None, None)
@@ -429,7 +436,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
mins,
maxs,
avgs,
finalised_range: self.range_complete,
range_final: self.range_final,
timed_out: self.timed_out,
missing_bins,
continue_at,
@@ -769,6 +776,11 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
}
fn set_range_complete(&mut self) {}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <BinsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
Box::new(ret)
}
}
impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {

View File

@@ -261,7 +261,7 @@ pub struct BinsXbinDim0CollectedResult<NTY> {
#[serde(rename = "avgs")]
avgs: VecDeque<f32>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "crate::bool_is_false")]
finalised_range: bool,
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")]
timed_out: bool,
#[serde(rename = "missingBins", default, skip_serializing_if = "Zero::is_zero")]
@@ -314,7 +314,7 @@ impl<NTY> BinsXbinDim0CollectedResult<NTY> {
}
pub fn range_final(&self) -> bool {
self.finalised_range
self.range_final
}
pub fn missing_bins(&self) -> u32 {
@@ -344,7 +344,7 @@ impl<NTY: ScalarOps> ToJsonResult for BinsXbinDim0CollectedResult<NTY> {
#[derive(Debug)]
pub struct BinsXbinDim0Collector<NTY> {
timed_out: bool,
range_complete: bool,
range_final: bool,
vals: BinsXbinDim0<NTY>,
}
@@ -352,7 +352,7 @@ impl<NTY> BinsXbinDim0Collector<NTY> {
pub fn new() -> Self {
Self {
timed_out: false,
range_complete: false,
range_final: false,
vals: BinsXbinDim0::<NTY>::empty(),
}
}
@@ -380,7 +380,7 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
}
fn set_range_complete(&mut self) {
self.range_complete = true;
self.range_final = true;
}
fn set_timed_out(&mut self) {
@@ -394,16 +394,23 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
0
};
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
let (missing_bins, continue_at, finished_at) = if self.range_final {
if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => {
warn!("can not determine continue-at parameters");
(0, None, None)
}
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
} else {
(0, None, None)
}
} else {
(0, None, None)
@@ -430,7 +437,7 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
mins,
maxs,
avgs,
finalised_range: self.range_complete,
range_final: self.range_final,
timed_out: self.timed_out,
missing_bins,
continue_at,
@@ -770,6 +777,11 @@ impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
}
fn set_range_complete(&mut self) {}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <BinsXbinDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
Box::new(ret)
}
}
impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {

View File

@@ -576,6 +576,13 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner {
None => (),
}
}
fn empty(&self) -> Option<Self::Output> {
match self.binner.as_ref() {
Some(binner) => Some(binner.empty()),
None => None,
}
}
}
impl crate::timebin::TimeBinnable for ChannelEvents {

View File

@@ -13,6 +13,12 @@ use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
#[allow(unused)]
macro_rules! trace2 {
(EN$($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0<NTY> {
pub tss: VecDeque<u64>,
@@ -286,10 +292,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
if let Some(range) = &range {
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
warn!("can not determine continue-at parameters");
None
}
}
@@ -820,13 +823,11 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
let self_name = std::any::type_name::<Self>();
if true {
trace!(
"TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------",
self.edges.iter().take(2).collect::<Vec<_>>(),
item
);
}
trace2!(
"TimeBinner for EventsDim0TimeBinner {:?}\n{:?}\n------------------------------------",
self.edges.iter().take(2).collect::<Vec<_>>(),
item
);
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.
return;
@@ -949,6 +950,11 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
Box::new(ret)
}
}
// TODO remove this struct?

View File

@@ -451,10 +451,7 @@ where
if let Some(range) = &range {
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
} else {
// TODO tricky: should yield again the original range begin? Leads to recursion.
// Range begin plus delta?
// Anyway, we don't have the range begin here.
warn!("timed out without any result, can not yield a continue-at");
warn!("can not determine continue-at parameters");
None
}
}

View File

@@ -191,7 +191,7 @@ impl crate::merger::Mergeable for Box<dyn Events> {
}
// TODO rename to `Typed`
pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo {
pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo + Empty {
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
fn aggregator(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator;

View File

@@ -1,8 +1,8 @@
use std::fmt;
pub trait TimeBinner: fmt::Debug + Unpin {
type Input;
type Output;
type Input: fmt::Debug;
type Output: fmt::Debug;
fn ingest(&mut self, item: &mut Self::Input);
@@ -20,6 +20,8 @@ pub trait TimeBinner: fmt::Debug + Unpin {
/// to `push_in_progress` did not change the result count, as long as edges are left.
/// The next call to `Self::bins_ready_count` must return one higher count than before.
fn cycle(&mut self);
fn empty(&self) -> Option<Self::Output>;
}
pub trait TimeBinnable: fmt::Debug + Sized {

View File

@@ -1030,7 +1030,7 @@ impl Shape {
pub fn to_scylla_vec(&self) -> Vec<i32> {
use Shape::*;
match self {
Scalar => vec![],
Scalar => Vec::new(),
Wave(n) => vec![*n as i32],
Image(n, m) => vec![*n as i32, *m as i32],
}
@@ -1870,11 +1870,12 @@ pub enum ReadSys {
Read2,
Read3,
Read4,
Read5,
}
impl ReadSys {
pub fn default() -> Self {
Self::TokioAsyncRead
Self::Read5
}
}
@@ -1888,6 +1889,8 @@ impl From<&str> for ReadSys {
Self::Read3
} else if k == "Read4" {
Self::Read4
} else if k == "Read5" {
Self::Read5
} else {
Self::default()
}

View File

@@ -286,6 +286,10 @@ impl Api1Query {
pub fn events_max(&self) -> Option<u64> {
self.events_max
}
pub fn set_decompress(&mut self, v: bool) {
self.decompress = v;
}
}
#[test]

View File

@@ -41,7 +41,7 @@ async fn read_next_status_events(
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
);
scy.query(
cql,
@@ -61,7 +61,7 @@ async fn read_next_status_events(
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
);
scy.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
.await
@@ -69,11 +69,10 @@ async fn read_next_status_events(
};
let mut last_before = None;
let mut ret = VecDeque::new();
for row in res.rows_typed_or_empty::<(i64, i64, i32)>() {
for row in res.rows_typed_or_empty::<(i64, i32)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let _pulse = row.1 as u64;
let kind = row.2 as u32;
let kind = row.1 as u32;
// from netfetch::store::ChannelStatus
let ev = ConnStatusEvent {
ts,
@@ -132,9 +131,10 @@ impl ReadValues {
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msps.pop_front() {
self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 0);
self.fut = self.make_fut(ts_msp);
true
} else {
info!("no more msp");
false
}
}
@@ -142,8 +142,8 @@ impl ReadValues {
fn make_fut(
&mut self,
ts_msp: u64,
_has_more_msp: bool,
) -> Pin<Box<dyn Future<Output = Result<VecDeque<ConnStatusEvent>, Error>> + Send>> {
info!("make fut for {ts_msp}");
let fut = read_next_status_events(
self.series,
ts_msp,
@@ -168,7 +168,6 @@ pub struct StatusStreamScylla {
range: NanoRange,
do_one_before_range: bool,
scy: Arc<ScySession>,
ts_msps: VecDeque<u64>,
outbuf: VecDeque<ConnStatusEvent>,
}
@@ -180,7 +179,6 @@ impl StatusStreamScylla {
range,
do_one_before_range,
scy,
ts_msps: VecDeque::new(),
outbuf: VecDeque::new(),
}
}
@@ -202,13 +200,14 @@ impl Stream for StatusStreamScylla {
let mut ts_msps = VecDeque::new();
let mut ts = self.range.beg / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
while ts < self.range.end {
info!("Use ts {ts}");
ts_msps.push_back(ts);
ts += CONNECTION_STATUS_DIV;
}
let st = ReadValues::new(
self.series,
self.range.clone(),
self.ts_msps.clone(),
ts_msps,
true,
self.do_one_before_range,
self.scy.clone(),
@@ -227,7 +226,10 @@ impl Stream for StatusStreamScylla {
}
continue;
}
Ready(Err(e)) => Ready(Some(Err(e))),
Ready(Err(e)) => {
error!("{e}");
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::Done => Ready(None),

View File

@@ -57,7 +57,7 @@ where
break;
}
};
trace!("collect_in_span see item");
trace!("collect_in_span see item {item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {

View File

@@ -36,6 +36,7 @@ pub async fn plain_events_json(
let stream = inp0.chain(inp1).chain(inp2);
stream
};
netpod::log::info!("plain_events_json with empty item {empty:?}");
let stream = { items_2::merger::Merger::new(inps, 1) };
let stream = stream::iter([empty]).chain(stream);
let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?;

View File

@@ -81,11 +81,14 @@ where
}
fn process_item(&mut self, mut item: T) -> () {
trace!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.edges.clone(), self.do_time_weight);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();
trace!("process_item call binner ingest");
binner.ingest(&mut item);
}
}
@@ -198,9 +201,17 @@ where
Ready(Some(Err(e)))
}
} else {
trace2!("no bins ready yet");
self.done_data = true;
continue;
if let Some(bins) = binner.empty() {
trace!("at end of stream, bin count zero, return {bins:?}");
self.done_data = true;
Ready(Some(sitem_data(bins)))
} else {
error!("at the end, no bins, can not get empty");
self.done_data = true;
let e = Error::with_msg_no_trace(format!("no bins"))
.add_public_msg(format!("unable to produce bins"));
Ready(Some(Err(e)))
}
}
} else {
trace2!("input stream finished, still no binner");

View File

@@ -35,7 +35,8 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
);
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = { items_2::merger::Merger::new(inps, 128) };
netpod::log::info!("timebinned_json with empty item {empty:?}");
let stream = items_2::merger::Merger::new(inps, 128);
let stream = stream::iter([empty]).chain(stream);
let stream = Box::pin(stream);
let stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.edges(), do_time_weight, deadline);