From 326fe793cee2e72d28f43a718caa9f10b5a10925 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 8 Feb 2023 07:14:22 +0100 Subject: [PATCH] Switch to dyn events process --- dbconn/src/channelconfig.rs | 106 ++---- disk/Cargo.toml | 1 - disk/src/binnedstream.rs | 2 +- disk/src/channelconfig.rs | 40 +++ disk/src/decode.rs | 393 ++++++++++++++++++++++- disk/src/disk.rs | 35 +- disk/src/eventblobs.rs | 24 +- disk/src/merge.rs | 14 +- disk/src/merge/mergedblobsfromremotes.rs | 11 +- disk/src/raw/conn.rs | 194 ++--------- httpret/src/api4.rs | 1 + httpret/src/{ => api4}/events.rs | 27 +- httpret/src/channel_status.rs | 4 +- httpret/src/channelconfig.rs | 41 ++- httpret/src/httpret.rs | 26 +- items/src/eventfull.rs | 167 +++++----- items/src/items.rs | 3 + items_0/src/scalar_ops.rs | 20 +- items_0/src/subfr.rs | 4 + items_2/src/eventsdim0.rs | 28 +- items_2/src/items_2.rs | 96 +----- nodenet/src/channelconfig.rs | 84 +++++ nodenet/src/conn.rs | 20 +- nodenet/src/nodenet.rs | 1 + scyllaconn/src/bincache.rs | 34 +- streams/src/plaineventsjson.rs | 4 +- streams/src/timebinnedjson.rs | 6 +- 27 files changed, 867 insertions(+), 519 deletions(-) create mode 100644 disk/src/channelconfig.rs rename httpret/src/{ => api4}/events.rs (86%) create mode 100644 nodenet/src/channelconfig.rs diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index 0cf13f4..ad33277 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -15,7 +15,7 @@ use netpod::Shape; /// Otherwise we try to uniquely identify the series id from the given information. /// In the future, we can even try to involve time range information for that, but backends like /// old archivers and sf databuffer do not support such lookup. -pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Result { +pub async fn chconf_from_scylla_type_backend(channel: &Channel, ncc: &NodeConfigCached) -> Result { if channel.backend != ncc.node_config.cluster.backend { warn!( "mismatched backend {} vs {}", @@ -23,59 +23,6 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> ); } let backend = channel.backend().into(); - if channel.backend() == "test-inmem" { - let ret = if channel.name() == "inmem-d0-i32" { - let ret = ChConf { - backend: channel.backend().into(), - series: 1, - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; - return ret; - } - if channel.backend() == "test-disk-databuffer" { - // TODO the series-ids here are just random. Need to integrate with better test setup. - let ret = if channel.name() == "scalar-i32-be" { - let ret = ChConf { - backend, - series: 1, - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "wave-f64-be-n21" { - let ret = ChConf { - backend, - series: 2, - name: channel.name().into(), - scalar_type: ScalarType::F64, - shape: Shape::Wave(21), - }; - Ok(ret) - } else if channel.name() == "const-regular-scalar-i32-be" { - let ret = ChConf { - backend, - series: 3, - name: channel.name().into(), - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; - return ret; - } let dbconf = &ncc.node_config.cluster.database; let pgclient = crate::create_connection(dbconf).await?; if let Some(series) = channel.series() { @@ -106,36 +53,41 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Ok(ret) } } else { - let res = pgclient + if ncc.node_config.cluster.scylla.is_some() { + let res = pgclient .query( "select channel, series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2", &[&channel.backend(), &channel.name()], ) .await .err_conv()?; - if res.len() < 1 { - warn!("can not find channel information for {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); - Err(e) - } else if res.len() > 1 { - warn!("ambigious channel {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); - Err(e) + if res.len() < 1 { + warn!("can not find channel information for {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); + Err(e) + } else if res.len() > 1 { + warn!("ambigious channel {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); + Err(e) + } else { + let row = res.first().unwrap(); + let name: String = row.get(0); + let series = row.get::<_, i64>(1) as u64; + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; + let ret = ChConf { + backend, + series, + name, + scalar_type, + shape, + }; + Ok(ret) + } } else { - let row = res.first().unwrap(); - let name: String = row.get(0); - let series = row.get::<_, i64>(1) as u64; - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; - let ret = ChConf { - backend, - series, - name, - scalar_type, - shape, - }; - Ok(ret) + error!("TODO xm89ur8932cr"); + Err(Error::with_msg_no_trace("TODO xm89ur8932cr")) } } } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 64c7ecc..059996b 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -22,7 +22,6 @@ bytes = "1.0.1" crc32fast = "1.3.2" arrayref = "0.3.6" byteorder = "1.4.3" -futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" tracing = "0.1.25" diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index cf6c2b7..e4a59dd 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,5 +1,5 @@ use err::Error; -use futures_core::Stream; +use futures_util::Stream; use futures_util::StreamExt; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs new file mode 100644 index 0000000..82acd49 --- /dev/null +++ b/disk/src/channelconfig.rs @@ -0,0 +1,40 @@ +use err::Error; +use netpod::Channel; +use netpod::ChannelConfig; +use netpod::NanoRange; +use netpod::NodeConfigCached; +use parse::channelconfig::extract_matching_config_entry; +use parse::channelconfig::read_local_config; +use parse::channelconfig::MatchingConfigEntry; + +pub async fn config( + range: NanoRange, + channel: Channel, + node_config: &NodeConfigCached, +) -> Result { + let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?; + let entry_res = match extract_matching_config_entry(&range, &channel_config) { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => entry, + }; + let shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let channel_config = ChannelConfig { + channel: channel.clone(), + keyspace: entry.ks as u8, + time_bin_size: entry.bs, + shape: shape, + scalar_type: entry.scalar_type.clone(), + byte_order: entry.byte_order.clone(), + array: entry.is_array, + compression: entry.is_compressed, + }; + Ok(channel_config) +} diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 79b0870..22d7898 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,20 +1,41 @@ use crate::agg::enp::Identity; use crate::eventblobs::EventChunkerMultifile; use err::Error; -use futures_core::Stream; +use futures_util::Stream; use futures_util::StreamExt; use items::eventfull::EventFull; use items::eventsitem::EventsItem; -use items::numops::{BoolNum, NumOps, StringNum}; -use items::plainevents::{PlainEvents, ScalarPlainEvents}; +use items::numops::BoolNum; +use items::numops::NumOps; +use items::numops::StringNum; +use items::plainevents::PlainEvents; +use items::plainevents::ScalarPlainEvents; use items::scalarevents::ScalarEvents; -use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner}; -use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; -use netpod::{ScalarType, Shape}; +use items::waveevents::WaveEvents; +use items::waveevents::WaveNBinner; +use items::waveevents::WavePlainProc; +use items::waveevents::WaveXBinner; +use items::Appendable; +use items::EventAppendable; +use items::EventsNodeProcessor; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; +use items_0::scalar_ops::ScalarOps; +use items_0::Events; +use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; +#[allow(unused)] +use netpod::log::*; +use netpod::AggKind; +use netpod::ScalarType; +use netpod::Shape; use std::marker::PhantomData; +use std::mem; use std::mem::size_of; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; pub trait Endianness: Send + Unpin { fn is_big() -> bool; @@ -32,6 +53,11 @@ impl Endianness for BigEndian { } } +pub enum Endian { + Little, + Big, +} + pub trait NumFromBytes { fn convert(buf: &[u8], big_endian: bool) -> NTY; } @@ -113,6 +139,247 @@ impl_num_from_bytes!(i64, 8); impl_num_from_bytes!(f32, 4); impl_num_from_bytes!(f64, 8); +pub trait ScalarValueFromBytes { + fn convert(buf: &[u8], endian: Endian) -> Result; + fn convert_dim1(buf: &[u8], endian: Endian, n: usize) -> Result, Error>; +} + +macro_rules! impl_scalar_value_from_bytes { + ($nty:ident, $nl:expr) => { + impl ScalarValueFromBytes<$nty> for $nty { + // Error in data on disk: + // Can not rely on byte order as stated in the channel config. + // Endianness in sf-databuffer can be specified for each event. + fn convert(buf: &[u8], endian: Endian) -> Result<$nty, Error> { + //$nty::$ec(*arrayref::array_ref![buf, 0, $nl]) + use Endian::*; + let ret = match endian { + Little => $nty::from_le_bytes(buf[..$nl].try_into()?), + Big => $nty::from_be_bytes(buf[..$nl].try_into()?), + }; + Ok(ret) + } + + fn convert_dim1(buf: &[u8], endian: Endian, n: usize) -> Result, Error> { + let ret = buf + .chunks_exact(n.min($nl)) + .map(|b2| { + use Endian::*; + let ret = match endian { + Little => $nty::from_le_bytes(b2[..$nl].try_into().unwrap()), + Big => $nty::from_be_bytes(b2[..$nl].try_into().unwrap()), + }; + ret + }) + .collect(); + Ok(ret) + } + } + }; +} + +impl_scalar_value_from_bytes!(u8, 1); +impl_scalar_value_from_bytes!(u16, 2); +impl_scalar_value_from_bytes!(u32, 4); +impl_scalar_value_from_bytes!(u64, 8); +impl_scalar_value_from_bytes!(i8, 1); +impl_scalar_value_from_bytes!(i16, 2); +impl_scalar_value_from_bytes!(i32, 4); +impl_scalar_value_from_bytes!(i64, 8); +impl_scalar_value_from_bytes!(f32, 4); +impl_scalar_value_from_bytes!(f64, 8); + +impl ScalarValueFromBytes for String { + fn convert(buf: &[u8], _endian: Endian) -> Result { + let s = if buf.len() >= 255 { + String::from_utf8_lossy(&buf[..255]) + } else { + String::from_utf8_lossy(buf) + }; + Ok(s.into()) + } + + fn convert_dim1(buf: &[u8], _endian: Endian, _n: usize) -> Result, Error> { + let s = if buf.len() >= 255 { + String::from_utf8_lossy(&buf[..255]) + } else { + String::from_utf8_lossy(buf) + }; + Ok(vec![s.into()]) + } +} + +impl ScalarValueFromBytes for bool { + fn convert(buf: &[u8], _endian: Endian) -> Result { + if buf.len() >= 1 { + if buf[0] != 0 { + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + fn convert_dim1(buf: &[u8], _endian: Endian, n: usize) -> Result, Error> { + let nn = buf.len().min(n); + Ok(buf.iter().take(nn).map(|&x| x != 0).collect()) + } +} + +pub trait ValueFromBytes: Send { + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; +} + +pub trait ValueDim0FromBytes { + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; +} + +pub trait ValueDim1FromBytes { + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; +} + +pub struct ValueDim0FromBytesImpl +where + STY: ScalarOps, +{ + _m1: PhantomData, +} + +impl ValueDim0FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn boxed() -> Box { + Box::new(Self { + _m1: Default::default(), + }) + } +} + +impl ValueDim0FromBytes for ValueDim0FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + if let Some(evs) = events.as_any_mut().downcast_mut::>() { + let v = >::convert(buf, endian)?; + evs.values.push_back(v); + evs.tss.push_back(ts); + evs.pulses.push_back(pulse); + Ok(()) + } else { + Err(Error::with_msg_no_trace("unexpected container")) + } + } +} + +impl ValueFromBytes for ValueDim0FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + ValueDim0FromBytes::convert(self, ts, pulse, buf, endian, events) + } +} + +pub struct ValueDim1FromBytesImpl +where + STY: ScalarOps, +{ + shape: Shape, + _m1: PhantomData, +} + +impl ValueDim1FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn boxed(shape: Shape) -> Box { + Box::new(Self { + shape, + _m1: Default::default(), + }) + } +} + +impl ValueFromBytes for ValueDim1FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + ValueDim1FromBytes::convert(self, ts, pulse, buf, endian, events) + } +} + +impl ValueDim1FromBytes for ValueDim1FromBytesImpl +where + STY: ScalarOps + ScalarValueFromBytes, +{ + fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + if let Some(evs) = events.as_any_mut().downcast_mut::>() { + let n = if let Shape::Wave(n) = self.shape { + n + } else { + return Err(Error::with_msg_no_trace("ValueDim1FromBytesImpl bad shape")); + }; + let v = >::convert_dim1(buf, endian, n as _)?; + evs.values.push_back(v); + evs.tss.push_back(ts); + evs.pulses.push_back(pulse); + Ok(()) + } else { + Err(Error::with_msg_no_trace("unexpected container")) + } + } +} + +fn make_scalar_conv( + scalar_type: &ScalarType, + shape: &Shape, + agg_kind: &AggKind, +) -> Result, Error> { + let ret = match agg_kind { + AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"), + AggKind::Plain | AggKind::DimXBinsN(_) | AggKind::DimXBins1 | AggKind::TimeWeightedScalar => match shape { + Shape::Scalar => match scalar_type { + ScalarType::U8 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::U16 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::U32 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::U64 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::I8 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::I16 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::I32 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::I64 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::F32 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::F64 => ValueDim0FromBytesImpl::::boxed(), + ScalarType::BOOL => ValueDim0FromBytesImpl::::boxed(), + ScalarType::STRING => ValueDim0FromBytesImpl::::boxed(), + }, + Shape::Wave(_) => { + let shape = shape.clone(); + match scalar_type { + ScalarType::U8 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::U16 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::U32 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::U64 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::I8 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::I16 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::I32 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::I64 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::F32 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::F64 => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::BOOL => ValueDim1FromBytesImpl::::boxed(shape), + ScalarType::STRING => ValueDim1FromBytesImpl::::boxed(shape), + } + } + Shape::Image(_, _) => todo!("make_scalar_conv Image"), + }, + }; + Ok(ret) +} + pub trait EventValueFromBytes where NTY: NumFromBytes, @@ -339,18 +606,102 @@ where } pub struct EventsDynStream { + scalar_type: ScalarType, + shape: Shape, + agg_kind: AggKind, events_full: EventChunkerMultifile, + events_out: Box, + scalar_conv: Box, + emit_threshold: usize, done: bool, complete: bool, } impl EventsDynStream { - pub fn new(events_full: EventChunkerMultifile) -> Self { - Self { + pub fn new( + scalar_type: ScalarType, + shape: Shape, + agg_kind: AggKind, + events_full: EventChunkerMultifile, + ) -> Result { + let st = &scalar_type; + let sh = &shape; + let ag = &agg_kind; + let events_out = items_2::empty_events_dyn_ev(st, sh, ag)?; + let scalar_conv = make_scalar_conv(st, sh, ag)?; + let emit_threshold = match &shape { + Shape::Scalar => 2048, + Shape::Wave(_) => 64, + Shape::Image(_, _) => 1, + }; + let ret = Self { + scalar_type, + shape, + agg_kind, events_full, + events_out, + scalar_conv, + emit_threshold, done: false, complete: false, + }; + Ok(ret) + } + + fn replace_events_out(&mut self) -> Result, Error> { + let st = &self.scalar_type; + let sh = &self.shape; + let ag = &self.agg_kind; + let empty = items_2::empty_events_dyn_ev(st, sh, ag)?; + let evs = mem::replace(&mut self.events_out, empty); + Ok(evs) + } + + fn handle_event_full(&mut self, item: EventFull) -> Result<(), Error> { + use items::WithLen; + if item.len() >= self.emit_threshold { + info!("handle_event_full item len {}", item.len()); } + for (((buf, &be), &ts), &pulse) in item + .blobs + .iter() + .zip(item.be.iter()) + .zip(item.tss.iter()) + .zip(item.pulses.iter()) + { + let endian = if be { Endian::Big } else { Endian::Little }; + self.scalar_conv + .convert(ts, pulse, buf, endian, self.events_out.as_mut())?; + } + Ok(()) + } + + fn handle_stream_item( + &mut self, + item: StreamItem>, + ) -> Result>>, Error> { + let ret = match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) + } + RangeCompletableItem::Data(item) => match self.handle_event_full(item) { + Ok(()) => { + // TODO collect stats. + if self.events_out.len() >= self.emit_threshold { + let evs = self.replace_events_out()?; + Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs)))) + } else { + None + } + } + Err(e) => Some(Err(e)), + }, + }, + StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), + StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), + }; + Ok(ret) } } @@ -367,16 +718,30 @@ impl Stream for EventsDynStream { Ready(None) } else { match self.events_full.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - todo!() - } + Ready(Some(Ok(item))) => match self.handle_stream_item(item) { + Ok(Some(item)) => Ready(Some(item)), + Ok(None) => continue, + Err(e) => { + self.done = true; + Ready(Some(Err(e))) + } + }, Ready(Some(Err(e))) => { self.done = true; Ready(Some(Err(e))) } Ready(None) => { - self.done = true; - continue; + // Produce a last one even if it is empty. + match self.replace_events_out() { + Ok(item) => { + self.done = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } + Err(e) => { + self.done = true; + Ready(Some(Err(e))) + } + } } Pending => Pending, } diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 6a430ad..0843928 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -3,6 +3,7 @@ pub mod agg; pub mod aggtest; pub mod binnedstream; pub mod cache; +pub mod channelconfig; pub mod dataopen; pub mod decode; pub mod eventblobs; @@ -20,12 +21,17 @@ pub mod streamlog; use bytes::Bytes; use bytes::BytesMut; use err::Error; -use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::{FutureExt, StreamExt, TryFutureExt}; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use futures_util::TryFutureExt; use netpod::log::*; +use netpod::ChannelConfig; +use netpod::DiskIoTune; +use netpod::Node; use netpod::ReadSys; -use netpod::{ChannelConfig, DiskIoTune, Node, Shape}; +use netpod::Shape; use std::collections::VecDeque; use std::future::Future; use std::io::SeekFrom; @@ -37,12 +43,17 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; use std::time::Instant; -use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; +use streams::dtflags::ARRAY; +use streams::dtflags::BIG_ENDIAN; +use streams::dtflags::COMPRESSION; +use streams::dtflags::SHAPE; use streams::filechunkread::FileChunkRead; use tokio::fs::File; use tokio::fs::OpenOptions; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use tokio::io::ReadBuf; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt}; use tokio::sync::mpsc; // TODO transform this into a self-test or remove. @@ -257,14 +268,14 @@ fn start_read5( } }; let mut pos = pos_beg; - info!("start_read5 BEGIN {disk_io_tune:?}"); + info!("read5 begin {disk_io_tune:?}"); loop { let mut buf = BytesMut::new(); - buf.resize(1024 * 256, 0); + buf.resize(disk_io_tune.read_buffer_len, 0); match tokio::time::timeout(Duration::from_millis(8000), file.read(&mut buf)).await { Ok(Ok(n)) => { if n == 0 { - info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}"); + //info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}"); break; } pos += n as u64; @@ -273,7 +284,7 @@ fn start_read5( match tx.send(Ok(item)).await { Ok(()) => {} Err(_) => { - error!("broken channel"); + //error!("broken channel"); break; } } @@ -283,7 +294,7 @@ fn start_read5( break; } Err(_) => { - error!("broken channel"); + //error!("broken channel"); break; } }, @@ -294,7 +305,7 @@ fn start_read5( match tx.send(Err(e)).await { Ok(()) => {} Err(_e) => { - error!("broken channel"); + //error!("broken channel"); break; } } @@ -302,6 +313,8 @@ fn start_read5( } } } + let n = pos - pos_beg; + info!("read5 done {n}"); }; tokio::task::spawn(fut); Ok(()) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 8647d4b..e2e1a77 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,16 +1,26 @@ -use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; +use crate::dataopen::open_expanded_files; +use crate::dataopen::open_files; +use crate::dataopen::OpenedFileSet; use crate::merge::MergedStream; use err::Error; -use futures_core::Stream; +use futures_util::Stream; use futures_util::StreamExt; use items::eventfull::EventFull; -use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem}; +use items::LogItem; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node}; +use netpod::ChannelConfig; +use netpod::DiskIoTune; +use netpod::NanoRange; +use netpod::Node; use std::pin::Pin; -use std::task::{Context, Poll}; -use streams::eventchunker::{EventChunker, EventChunkerConf}; +use std::task::Context; +use std::task::Poll; +use streams::eventchunker::EventChunker; +use streams::eventchunker::EventChunkerConf; use streams::rangefilter::RangeFilter; pub trait InputTraits: Stream> {} @@ -97,7 +107,7 @@ impl Stream for EventChunkerMultifile { Some(evs) => match evs.poll_next_unpin(cx) { Ready(Some(k)) => { if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k { - if let Some(&g) = h.tss.last() { + if let Some(&g) = h.tss.back() { if g == self.max_ts { let msg = format!("EventChunkerMultifile repeated ts {}", g); error!("{}", msg); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 794174c..d8521cb 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,16 +1,24 @@ pub mod mergedblobsfromremotes; use err::Error; -use futures_core::Stream; +use futures_util::Stream; use futures_util::StreamExt; +use items::Appendable; use items::ByteEstimate; -use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps}; +use items::LogItem; +use items::PushableIndex; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StatsItem; +use items::StreamItem; +use items::WithTimestamps; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::ByteSize; use std::collections::VecDeque; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; const LOG_EMIT_ITEM: bool = false; diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 2a714bd..56d46c1 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -1,15 +1,18 @@ use crate::merge::MergedStream; use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt}; +use futures_util::pin_mut; +use futures_util::Stream; +use futures_util::StreamExt; use items::eventfull::EventFull; use items::Sitemty; use netpod::log::*; use netpod::query::PlainEventsQuery; -use netpod::{Cluster, PerfOpts}; +use netpod::Cluster; +use netpod::PerfOpts; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use streams::tcprawclient::x_processed_event_blobs_stream_from_node; type T001 = Pin> + Send>>; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index f12df8f..eb11557 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,31 +1,16 @@ -use crate::decode::BigEndian; -use crate::decode::Endianness; -use crate::decode::EventValueFromBytes; -use crate::decode::EventValueShape; -use crate::decode::EventValuesDim0Case; -use crate::decode::EventValuesDim1Case; -use crate::decode::EventsDecodedStream; -use crate::decode::LittleEndian; -use crate::decode::NumFromBytes; use crate::eventblobs::EventChunkerMultifile; use err::Error; +use futures_util::stream; use futures_util::Stream; use futures_util::StreamExt; use items::eventfull::EventFull; -use items::numops::BoolNum; -use items::numops::NumOps; -use items::numops::StringNum; -use items::EventsNodeProcessor; -use items::Framable; use items::RangeCompletableItem; use items::Sitemty; use items::StreamItem; use items_2::channelevents::ChannelEvents; -use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::AggKind; -use netpod::ByteOrder; use netpod::ByteSize; use netpod::Channel; use netpod::DiskIoTune; @@ -37,167 +22,35 @@ use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::ConfigEntry; use parse::channelconfig::MatchingConfigEntry; -use std::collections::VecDeque; use std::pin::Pin; use streams::eventchunker::EventChunkerConf; -fn make_num_pipeline_stream_evs( - event_value_shape: EVS, - events_node_proc: ENP, +fn make_num_pipeline_stream_evs( + scalar_type: ScalarType, + shape: Shape, + agg_kind: AggKind, event_blobs: EventChunkerMultifile, -) -> Pin> + Send>> -where - NTY: NumOps + NumFromBytes + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - Sitemty<::Output>: Framable + 'static, - ::Output: 'static, -{ - let decs = EventsDecodedStream::::new(event_value_shape, event_blobs); - let s2 = StreamExt::map(decs, move |item| match item { +) -> Pin> + Send>> { + let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) { + Ok(k) => k, + Err(e) => { + return Box::pin(stream::iter([Err(e)])); + } + }; + let stream = event_stream.map(|item| match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { - RangeCompletableItem::Data(item) => { - // TODO fix super ugly slow glue code - use items::EventsNodeProcessorOutput; - let mut item = events_node_proc.process(item); - if let Some(item) = item - .as_any_mut() - .downcast_mut::>() - { - trace!("ScalarEvents"); - let tss: VecDeque = item.tss.iter().map(|x| *x).collect(); - let pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); - let values: VecDeque = item.values.iter().map(|x| x.clone()).collect(); - let item = EventsDim0 { tss, pulses, values }; - let item = ChannelEvents::Events(Box::new(item)); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } else if let Some(item) = item.as_any_mut().downcast_mut::>() { - trace!("WaveEvents"); - let _tss: VecDeque = item.tss.iter().map(|x| *x).collect(); - let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); - let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect(); - //let item = EventsDim1 { tss, pulses, values }; - //let item = ChannelEvents::Events(Box::new(item)); - //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } else if let Some(item) = item - .as_any_mut() - .downcast_mut::>() - { - trace!("XBinnedScalarEvents"); - let tss: VecDeque = item.tss.iter().map(|x| *x).collect(); - let pulses: VecDeque = (0..tss.len()).map(|_| 0).collect(); - let _avgs: VecDeque = item.avgs.iter().map(|x| x.clone()).collect(); - let mins: VecDeque = item.mins.iter().map(|x| x.clone()).collect(); - let _maxs: VecDeque = item.maxs.iter().map(|x| x.clone()).collect(); - let item = EventsDim0 { - tss, - pulses, - values: mins, - }; - let item = ChannelEvents::Events(Box::new(item)); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } else { - error!("TODO bad, no idea what this item is\n\n{:?}\n\n", item); - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + RangeCompletableItem::Data(item) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(item), + ))), }, - StreamItem::Log(item) => Ok(StreamItem::Log(item)), - StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), + StreamItem::Log(k) => Ok(StreamItem::Log(k)), + StreamItem::Stats(k) => Ok(StreamItem::Stats(k)), }, Err(e) => Err(e), }); - Box::pin(s2) -} - -macro_rules! pipe4 { - ($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { - match $agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { - make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( - $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind), - $event_blobs, - ) - } - AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( - $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind), - $event_blobs, - ), - AggKind::Plain => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( - $evsv, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape, $agg_kind), - $event_blobs, - ), - } - }; -} - -macro_rules! pipe3 { - ($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { - match $shape { - Shape::Scalar => { - pipe4!( - $nty, - $end, - $shape, - EventValuesDim0Case, - EventValuesDim0Case::<$nty>::new(), - $agg_kind, - $event_blobs - ) - } - Shape::Wave(n) => { - pipe4!( - $nty, - $end, - $shape, - EventValuesDim1Case, - EventValuesDim1Case::<$nty>::new(n), - $agg_kind, - $event_blobs - ) - } - Shape::Image(_, _) => { - // TODO not needed for python data api v3 protocol, but later for api4. - err::todoval() - } - } - }; -} - -macro_rules! pipe2 { - ($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { - match $end { - ByteOrder::Little => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), - ByteOrder::Big => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs), - } - }; -} - -macro_rules! pipe1 { - ($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { - match $nty { - ScalarType::U8 => pipe2!(u8, $end, $shape, $agg_kind, $event_blobs), - ScalarType::U16 => pipe2!(u16, $end, $shape, $agg_kind, $event_blobs), - ScalarType::U32 => pipe2!(u32, $end, $shape, $agg_kind, $event_blobs), - ScalarType::U64 => pipe2!(u64, $end, $shape, $agg_kind, $event_blobs), - ScalarType::I8 => pipe2!(i8, $end, $shape, $agg_kind, $event_blobs), - ScalarType::I16 => pipe2!(i16, $end, $shape, $agg_kind, $event_blobs), - ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs), - ScalarType::I64 => pipe2!(i64, $end, $shape, $agg_kind, $event_blobs), - ScalarType::F32 => pipe2!(f32, $end, $shape, $agg_kind, $event_blobs), - ScalarType::F64 => pipe2!(f64, $end, $shape, $agg_kind, $event_blobs), - ScalarType::BOOL => pipe2!(BoolNum, $end, $shape, $agg_kind, $event_blobs), - ScalarType::STRING => pipe2!(StringNum, $end, $shape, $agg_kind, $event_blobs), - } - }; + Box::pin(stream) } pub async fn make_event_pipe( @@ -262,12 +115,11 @@ pub async fn make_event_pipe( true, ); let shape = entry.to_shape()?; - let pipe = pipe1!( - entry.scalar_type, - entry.byte_order, - shape, + let pipe = make_num_pipeline_stream_evs( + entry.scalar_type.clone(), + shape.clone(), evq.agg_kind().clone(), - event_blobs + event_blobs, ); Ok(pipe) } diff --git a/httpret/src/api4.rs b/httpret/src/api4.rs index 69d250f..6f9f3bf 100644 --- a/httpret/src/api4.rs +++ b/httpret/src/api4.rs @@ -1,3 +1,4 @@ pub mod binned; +pub mod events; pub mod search; pub mod status; diff --git a/httpret/src/events.rs b/httpret/src/api4/events.rs similarity index 86% rename from httpret/src/events.rs rename to httpret/src/api4/events.rs index ec1e094..663a568 100644 --- a/httpret/src/events.rs +++ b/httpret/src/api4/events.rs @@ -1,14 +1,23 @@ -use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; +use crate::channelconfig::chconf_from_events_v1; use crate::err::Error; -use crate::{response, response_err, BodyStream, ToPublicResponse}; -use futures_util::{stream, TryStreamExt}; -use http::{Method, Request, Response, StatusCode}; +use crate::response; +use crate::response_err; +use crate::BodyStream; +use crate::ToPublicResponse; +use futures_util::stream; +use futures_util::TryStreamExt; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::FromUrl; use netpod::NodeConfigCached; -use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; +use netpod::APP_OCTET; use url::Url; pub struct EventsHandler {} @@ -29,7 +38,7 @@ impl EventsHandler { match plain_events(req, node_config).await { Ok(ret) => Ok(ret), Err(e) => { - error!("EventsHandler sees {e}"); + error!("EventsHandler sees: {e}"); Ok(e.to_public_response()) } } @@ -65,7 +74,7 @@ async fn plain_events_binary( ) -> Result, Error> { debug!("httpret plain_events_binary req: {:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let chconf = chconf_from_events_binary(&query, node_config).await?; + let chconf = chconf_from_events_v1(&query, node_config).await?; // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); @@ -89,9 +98,7 @@ async fn plain_events_json( let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; info!("plain_events_json query {query:?}"); - let chconf = chconf_from_events_json(&query, node_config) - .await - .map_err(Error::from)?; + let chconf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 49b2d65..c3bf227 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -74,7 +74,7 @@ impl ConnectionStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let _scy = scyllaconn::create_scy_session(scyco).await?; - let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; + let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; let _series = chconf.series; let _do_one_before_range = true; let ret = Vec::new(); @@ -148,7 +148,7 @@ impl ChannelStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; + let chconf = nodenet::channelconfig::channel_config(q.range().clone(),q.channel().clone(), node_config).await?; let do_one_before_range = true; let mut stream = scyllaconn::status::StatusStreamScylla::new(chconf.series, q.range().clone(), do_one_before_range, scy); diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9a82042..9bdb36e 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,34 +1,42 @@ use crate::err::Error; -use crate::{response, ToPublicResponse}; -use dbconn::channelconfig::chconf_from_database; +use crate::response; +use crate::ToPublicResponse; use dbconn::create_connection; use futures_util::StreamExt; -use http::{Method, Request, Response, StatusCode}; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::Body; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; -use netpod::query::{BinnedQuery, PlainEventsQuery}; +use netpod::query::BinnedQuery; +use netpod::query::PlainEventsQuery; use netpod::timeunits::*; use netpod::ChConf; -use netpod::{Channel, ChannelConfigQuery, FromUrl, ScalarType, Shape}; -use netpod::{ChannelConfigResponse, NodeConfigCached}; -use netpod::{ACCEPT_ALL, APP_JSON}; +use netpod::Channel; +use netpod::ChannelConfigQuery; +use netpod::ChannelConfigResponse; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::Shape; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::NewSessionError as ScyNewSessionError; use scylla::transport::errors::QueryError as ScyQueryError; use scylla::transport::iterator::NextRowError; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::collections::BTreeMap; use url::Url; -pub async fn chconf_from_events_binary(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await.map_err(Into::into) -} - -pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await.map_err(Into::into) +pub async fn chconf_from_events_v1(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { + let ret = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), ncc).await?; + Ok(ret) } pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { @@ -46,7 +54,8 @@ pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) } pub async fn chconf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await.map_err(Into::into) + let ret = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), ncc).await?; + Ok(ret) } pub struct ChannelConfigHandler {} @@ -93,7 +102,7 @@ impl ChannelConfigHandler { let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config for q {q:?}"); let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla { - let c = dbconn::channelconfig::chconf_from_database(&q.channel, node_config).await?; + let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; ChannelConfigResponse { channel: Channel { series: Some(c.series), diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index a85a45e..b6e9b77 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -5,24 +5,27 @@ pub mod channel_status; pub mod channelconfig; pub mod download; pub mod err; -pub mod events; pub mod gather; pub mod prometheus; pub mod proxy; pub mod pulsemap; pub mod settings; -use self::bodystream::{BodyStream, ToPublicResponse}; +use self::bodystream::BodyStream; +use self::bodystream::ToPublicResponse; use crate::bodystream::response; use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; -use futures_util::{Future, FutureExt, StreamExt}; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::StreamExt; use http::Method; use http::StatusCode; use hyper::server::conn::AddrStream; use hyper::server::Server; -use hyper::service::{make_service_fn, service_fn}; +use hyper::service::make_service_fn; +use hyper::service::service_fn; use hyper::Body; use hyper::Request; use hyper::Response; @@ -32,17 +35,22 @@ use netpod::query::prebinned::PreBinnedQuery; use netpod::timeunits::SEC; use netpod::NodeConfigCached; use netpod::ProxyConfig; -use netpod::{APP_JSON, APP_JSON_LINES}; +use netpod::APP_JSON; +use netpod::APP_JSON_LINES; use nodenet::conn::events_service; -use panic::{AssertUnwindSafe, UnwindSafe}; +use panic::AssertUnwindSafe; +use panic::UnwindSafe; use pin::Pin; use serde::Serialize; use std::collections::BTreeMap; use std::net; use std::panic; use std::pin; -use std::sync::atomic::{AtomicPtr, Ordering}; -use std::sync::{Once, RwLock, RwLockWriteGuard}; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::Ordering; +use std::sync::Once; +use std::sync::RwLock; +use std::sync::RwLockWriteGuard; use std::task; use std::time::SystemTime; use task::Context; @@ -302,7 +310,7 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = events::EventsHandler::handler(&req) { + } else if let Some(h) = api4::events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { h.handle(req, ctx, &node_config).await diff --git a/items/src/eventfull.rs b/items/src/eventfull.rs index a3816f5..f1ff9b8 100644 --- a/items/src/eventfull.rs +++ b/items/src/eventfull.rs @@ -1,70 +1,83 @@ +use crate::Appendable; +use crate::ByteEstimate; +use crate::Clearable; +use crate::FrameType; +use crate::FrameTypeInnerStatic; +use crate::PushableIndex; +use crate::WithLen; +use crate::WithTimestamps; use bytes::BytesMut; -use netpod::{ScalarType, Shape}; +use netpod::ScalarType; +use netpod::Shape; use parse::channelconfig::CompressionMethod; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; - -use crate::{ - Appendable, ByteEstimate, Clearable, FrameType, FrameTypeInnerStatic, PushableIndex, WithLen, WithTimestamps, -}; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; +use std::collections::VecDeque; #[derive(Debug, Serialize, Deserialize)] pub struct EventFull { - pub tss: Vec, - pub pulses: Vec, - pub blobs: Vec>, - #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] + pub tss: VecDeque, + pub pulses: VecDeque, + pub blobs: VecDeque>, + #[serde(with = "decomps_serde")] // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. - pub decomps: Vec>, - pub scalar_types: Vec, - pub be: Vec, - pub shapes: Vec, - pub comps: Vec>, + pub decomps: VecDeque>, + pub scalar_types: VecDeque, + pub be: VecDeque, + pub shapes: VecDeque, + pub comps: VecDeque>, } -fn decomps_ser(t: &Vec>, s: S) -> Result -where - S: Serializer, -{ - let a: Vec<_> = t - .iter() - .map(|k| match k { - None => None, - Some(j) => Some(j[..].to_vec()), - }) - .collect(); - Serialize::serialize(&a, s) -} +mod decomps_serde { + use super::*; -fn decomps_de<'de, D>(d: D) -> Result>, D::Error> -where - D: Deserializer<'de>, -{ - let a: Vec>> = Deserialize::deserialize(d)?; - let a = a - .iter() - .map(|k| match k { - None => None, - Some(j) => { - let mut a = BytesMut::new(); - a.extend_from_slice(&j); - Some(a) - } - }) - .collect(); - Ok(a) + pub fn serialize(t: &VecDeque>, s: S) -> Result + where + S: Serializer, + { + let a: Vec<_> = t + .iter() + .map(|k| match k { + None => None, + Some(j) => Some(j[..].to_vec()), + }) + .collect(); + Serialize::serialize(&a, s) + } + + pub fn deserialize<'de, D>(d: D) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + let a: Vec>> = Deserialize::deserialize(d)?; + let a = a + .iter() + .map(|k| match k { + None => None, + Some(j) => { + let mut a = BytesMut::new(); + a.extend_from_slice(&j); + Some(a) + } + }) + .collect(); + Ok(a) + } } impl EventFull { pub fn empty() -> Self { Self { - tss: vec![], - pulses: vec![], - blobs: vec![], - decomps: vec![], - scalar_types: vec![], - be: vec![], - shapes: vec![], - comps: vec![], + tss: VecDeque::new(), + pulses: VecDeque::new(), + blobs: VecDeque::new(), + decomps: VecDeque::new(), + scalar_types: VecDeque::new(), + be: VecDeque::new(), + shapes: VecDeque::new(), + comps: VecDeque::new(), } } @@ -79,14 +92,14 @@ impl EventFull { shape: Shape, comp: Option, ) { - self.tss.push(ts); - self.pulses.push(pulse); - self.blobs.push(blob); - self.decomps.push(decomp); - self.scalar_types.push(scalar_type); - self.be.push(be); - self.shapes.push(shape); - self.comps.push(comp); + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.blobs.push_back(blob); + self.decomps.push_back(decomp); + self.scalar_types.push_back(scalar_type); + self.be.push_back(be); + self.shapes.push_back(shape); + self.comps.push_back(comp); } pub fn decomp(&self, i: usize) -> &[u8] { @@ -120,14 +133,14 @@ impl Appendable for EventFull { // TODO expensive, get rid of it. fn append(&mut self, src: &Self) { - self.tss.extend_from_slice(&src.tss); - self.pulses.extend_from_slice(&src.pulses); - self.blobs.extend_from_slice(&src.blobs); - self.decomps.extend_from_slice(&src.decomps); - self.scalar_types.extend_from_slice(&src.scalar_types); - self.be.extend_from_slice(&src.be); - self.shapes.extend_from_slice(&src.shapes); - self.comps.extend_from_slice(&src.comps); + self.tss.extend(&src.tss); + self.pulses.extend(&src.pulses); + self.blobs.extend(src.blobs.iter().map(Clone::clone)); + self.decomps.extend(src.decomps.iter().map(Clone::clone)); + self.scalar_types.extend(src.scalar_types.iter().map(Clone::clone)); + self.be.extend(&src.be); + self.shapes.extend(src.shapes.iter().map(Clone::clone)); + self.comps.extend(src.comps.iter().map(Clone::clone)); } fn append_zero(&mut self, _ts1: u64, _ts2: u64) { @@ -171,13 +184,13 @@ impl ByteEstimate for EventFull { impl PushableIndex for EventFull { // TODO check all use cases, can't we move? fn push_index(&mut self, src: &Self, ix: usize) { - self.tss.push(src.tss[ix]); - self.pulses.push(src.pulses[ix]); - self.blobs.push(src.blobs[ix].clone()); - self.decomps.push(src.decomps[ix].clone()); - self.scalar_types.push(src.scalar_types[ix].clone()); - self.be.push(src.be[ix]); - self.shapes.push(src.shapes[ix].clone()); - self.comps.push(src.comps[ix].clone()); + self.tss.push_back(src.tss[ix]); + self.pulses.push_back(src.pulses[ix]); + self.blobs.push_back(src.blobs[ix].clone()); + self.decomps.push_back(src.decomps[ix].clone()); + self.scalar_types.push_back(src.scalar_types[ix].clone()); + self.be.push_back(src.be[ix]); + self.shapes.push_back(src.shapes[ix].clone()); + self.comps.push_back(src.comps[ix].clone()); } } diff --git a/items/src/items.rs b/items/src/items.rs index 0c7b195..fa05355 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -453,6 +453,7 @@ pub trait FilterFittingInside: Sized { } pub trait PushableIndex { + // TODO get rid of usage, involves copy. // TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop? fn push_index(&mut self, src: &Self, ix: usize); } @@ -463,6 +464,8 @@ pub trait NewEmpty { pub trait Appendable: WithLen { fn empty_like_self(&self) -> Self; + + // TODO get rid of usage, involves copy. fn append(&mut self, src: &Self); // TODO the `ts2` makes no sense for non-bin-implementors diff --git a/items_0/src/scalar_ops.rs b/items_0/src/scalar_ops.rs index 0b6a2a1..bfdc8eb 100644 --- a/items_0/src/scalar_ops.rs +++ b/items_0/src/scalar_ops.rs @@ -52,6 +52,13 @@ impl AsPrimF32 for bool { } } +impl AsPrimF32 for String { + fn as_prim_f32_b(&self) -> f32 { + // Well, at least some impl. + self.len() as f32 + } +} + pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static { @@ -67,7 +74,7 @@ macro_rules! impl_scalar_ops { } fn equal_slack(&self, rhs: &Self) -> bool { - $equal_slack(*self, *rhs) + $equal_slack(self, rhs) } } }; @@ -77,15 +84,19 @@ fn equal_int(a: T, b: T) -> bool { a == b } -fn equal_f32(a: f32, b: f32) -> bool { +fn equal_f32(&a: &f32, &b: &f32) -> bool { (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) } -fn equal_f64(a: f64, b: f64) -> bool { +fn equal_f64(&a: &f64, &b: &f64) -> bool { (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) } -fn equal_bool(a: bool, b: bool) -> bool { +fn equal_bool(&a: &bool, &b: &bool) -> bool { + a == b +} + +fn equal_string(a: &String, b: &String) -> bool { a == b } @@ -100,3 +111,4 @@ impl_scalar_ops!(i64, 0, equal_int); impl_scalar_ops!(f32, 0., equal_f32); impl_scalar_ops!(f64, 0., equal_f64); impl_scalar_ops!(bool, false, equal_bool); +impl_scalar_ops!(String, String::new(), equal_string); diff --git a/items_0/src/subfr.rs b/items_0/src/subfr.rs index a822bf5..e339fe6 100644 --- a/items_0/src/subfr.rs +++ b/items_0/src/subfr.rs @@ -45,3 +45,7 @@ impl SubFrId for f64 { impl SubFrId for bool { const SUB: u32 = 0x0d; } + +impl SubFrId for String { + const SUB: u32 = 0x0e; +} diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 5898454..54e99bb 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -269,6 +269,20 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector= 2 { + let mut print = false; + let c = self.vals.tss.len(); + if self.vals.tss[c - 2] + 1000000000 <= self.vals.tss[c - 1] { + print = true; + } + let c = self.vals.pulses.len(); + if self.vals.pulses[c - 2] + 1000 <= self.vals.pulses[c - 1] { + print = true; + } + if print { + error!("gap detected\n{self:?}"); + } + } } fn set_range_complete(&mut self) { @@ -303,13 +317,23 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector (VecDequ // TODO take iterator instead of slice, because a VecDeque can't produce a slice in general. pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { - let pulse_anchor = pulse.first().map_or(0, |k| *k); - let pulse_off = pulse.iter().map(|k| *k - pulse_anchor).collect(); + let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; + let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); (pulse_anchor, pulse_off) } @@ -207,10 +207,14 @@ pub trait TimeBinnableTypeAggregator: Send { fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; } -pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { - match shape { +pub fn empty_events_dyn_ev( + scalar_type: &ScalarType, + shape: &Shape, + agg_kind: &AggKind, +) -> Result, Error> { + let ret: Box = match shape { Shape::Scalar => match agg_kind { - AggKind::TimeWeightedScalar => { + AggKind::Plain | AggKind::TimeWeightedScalar => { use ScalarType::*; type K = eventsdim0::EventsDim0; match scalar_type { @@ -225,10 +229,7 @@ pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &Ag F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), BOOL => Box::new(K::::empty()), - _ => { - error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } + STRING => Box::new(K::::empty()), } } _ => { @@ -252,10 +253,7 @@ pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &Ag F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), BOOL => Box::new(K::::empty()), - _ => { - error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } + STRING => Box::new(K::::empty()), } } _ => { @@ -267,73 +265,11 @@ pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &Ag error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } - } + }; + Ok(ret) } -// TODO needed any longer? -pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { - match shape { - Shape::Scalar => match agg_kind { - AggKind::TimeWeightedScalar => { - use ScalarType::*; - type K = eventsdim0::EventsDim0; - match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } - } - } - _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } - }, - Shape::Wave(..) => match agg_kind { - AggKind::Plain => { - use ScalarType::*; - type K = eventsdim1::EventsDim1; - match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - BOOL => Box::new(K::::empty()), - _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } - } - } - _ => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } - }, - Shape::Image(..) => { - error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); - err::todoval() - } - } -} - -pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { +pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { match shape { Shape::Scalar => match agg_kind { AggKind::TimeWeightedScalar => { @@ -445,7 +381,7 @@ pub async fn binned_collected( let mut did_range_complete = false; let mut coll = None; let mut binner = None; - let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); + let empty_item = empty_events_dyn_ev(&scalar_type, &shape, &AggKind::TimeWeightedScalar)?; let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( empty_item, )))); @@ -533,7 +469,7 @@ pub async fn binned_collected( } None => { error!("binned_collected nothing collected"); - let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1); + let item = empty_binned_dyn_tb(&scalar_type, &shape, &AggKind::DimXBins1); let ret = item.to_box_to_json_result(); tokio::time::sleep(Duration::from_millis(2000)).await; Ok(ret) diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs new file mode 100644 index 0000000..25a3991 --- /dev/null +++ b/nodenet/src/channelconfig.rs @@ -0,0 +1,84 @@ +use err::Error; +use netpod::log::*; +use netpod::ChConf; +use netpod::Channel; +use netpod::NanoRange; +use netpod::NodeConfigCached; +use netpod::ScalarType; +use netpod::Shape; + +pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfigCached) -> Result { + if channel.backend() == "test-disk-databuffer" { + let backend = channel.backend().into(); + // TODO the series-ids here are just random. Need to integrate with better test setup. + let ret = if channel.name() == "scalar-i32-be" { + let ret = ChConf { + backend, + series: 1, + name: channel.name().into(), + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else if channel.name() == "wave-f64-be-n21" { + let ret = ChConf { + backend, + series: 2, + name: channel.name().into(), + scalar_type: ScalarType::F64, + shape: Shape::Wave(21), + }; + Ok(ret) + } else if channel.name() == "const-regular-scalar-i32-be" { + let ret = ChConf { + backend, + series: 3, + name: channel.name().into(), + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + ret + } else if channel.backend() == "test-inmem" { + let backend = channel.backend().into(); + let ret = if channel.name() == "inmem-d0-i32" { + let ret = ChConf { + backend, + series: 1, + name: channel.name().into(), + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + ret + } else if ncc.node_config.cluster.scylla.is_some() { + info!("try to get ChConf for scylla type backend"); + let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + .await + .map_err(Error::from)?; + Ok(ret) + } else if ncc.node.sf_databuffer.is_some() { + info!("try to get ChConf for sf-databuffer type backend"); + let c1 = disk::channelconfig::config(range, channel, ncc).await?; + let ret = ChConf { + backend: c1.channel.backend, + series: 0, + name: c1.channel.name, + scalar_type: c1.scalar_type, + shape: c1.shape, + }; + Ok(ret) + } else { + err::todoval() + } +} diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 351f86a..ade0f53 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,7 +1,6 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; -use items::eventfull::EventFull; use items::frame::decode_frame; use items::frame::make_term_frame; use items::EventQueryJsonStringFrame; @@ -98,7 +97,7 @@ async fn make_channel_events_stream( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; // TODO use better builder pattern with shortcuts for production and dev defaults - let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config) + let f = crate::channelconfig::channel_config(evq.range().clone(), evq.channel().clone(), node_config) .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scyco = conf; @@ -166,15 +165,6 @@ async fn make_channel_events_stream( } } -async fn make_event_blobs_stream( - evq: PlainEventsQuery, - node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { - info!("make_event_blobs_stream"); - let stream = disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await?; - Ok(stream) -} - async fn events_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, @@ -247,7 +237,7 @@ async fn events_conn_handler_inner_try( let mut stream: Pin> + Send>> = if let AggKind::EventBlobs = evq.agg_kind() { - match make_event_blobs_stream(evq, node_config).await { + match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { Ok(stream) => { let stream = stream.map(|x| Box::new(x) as _); Box::pin(stream) @@ -273,7 +263,11 @@ async fn events_conn_handler_inner_try( let item = item.make_frame(); match item { Ok(buf) => { - trace!("write {} bytes", buf.len()); + if buf.len() > 1024 * 64 { + warn!("emit buf len {}", buf.len()); + } else { + trace!("emit buf len {}", buf.len()); + } buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { Ok(_) => {} diff --git a/nodenet/src/nodenet.rs b/nodenet/src/nodenet.rs index 80ac2a1..3970cf9 100644 --- a/nodenet/src/nodenet.rs +++ b/nodenet/src/nodenet.rs @@ -1 +1,2 @@ +pub mod channelconfig; pub mod conn; diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index aaed907..eddeba2 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -1,22 +1,33 @@ use crate::errconv::ErrConv; use crate::events::EventsStreamScylla; use err::Error; -use futures_util::{Future, Stream, StreamExt}; +use futures_util::Future; +use futures_util::Stream; +use futures_util::StreamExt; use items_0::TimeBinned; use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; -use items_2::{empty_binned_dyn, empty_events_dyn}; +use items_2::empty_binned_dyn_tb; +use items_2::empty_events_dyn_ev; use netpod::log::*; -use netpod::query::{CacheUsage, PlainEventsQuery}; +use netpod::query::CacheUsage; +use netpod::query::PlainEventsQuery; use netpod::timeunits::*; -use netpod::{AggKind, ChannelTyped, ScalarType, Shape}; -use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange}; +use netpod::AggKind; +use netpod::ChannelTyped; +use netpod::PreBinnedPatchCoord; +use netpod::PreBinnedPatchIterator; +use netpod::PreBinnedPatchRange; +use netpod::ScalarType; +use netpod::Shape; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; pub async fn read_cached_scylla( series: u64, @@ -281,7 +292,7 @@ pub async fn fetch_uncached_higher_res_prebinned( // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. let do_time_weight = true; // We must produce some result with correct types even if upstream delivers nothing at all. - let bin0 = empty_binned_dyn(&chn.scalar_type, &chn.shape, &agg_kind); + let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind); let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); let mut complete = true; let patch_it = PreBinnedPatchIterator::from_range(range.clone()); @@ -354,8 +365,11 @@ pub async fn fetch_uncached_binned_events( // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. let do_time_weight = true; // We must produce some result with correct types even if upstream delivers nothing at all. - let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); - let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); + //let bin0 = empty_events_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind); + //let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); + let mut time_binner = empty_events_dyn_ev(&chn.scalar_type, &chn.shape, &agg_kind)? + .as_time_binnable() + .time_binner_new(edges.clone(), do_time_weight); // TODO handle deadline better let deadline = Instant::now(); let deadline = deadline diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 4c7d072..b6fb73d 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -18,9 +18,7 @@ pub async fn plain_events_json( // TODO remove magic constant let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); let events_max = query.events_max(); - let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); - let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); - let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?; let empty = ChannelEvents::Events(empty); let empty = items::sitem_data(empty); // TODO should be able to ask for data-events only, instead of mixed data and status events. diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index eae8c74..5317aab 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -20,9 +20,7 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let do_time_weight = query.agg_kind().do_time_weighted(); let timeout = Duration::from_millis(7500); let deadline = Instant::now() + timeout; - let _empty = items::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); - let _empty = items_2::empty_events_dyn(&chconf.scalar_type, &chconf.shape, query.agg_kind()); - let empty = items_2::empty_events_dyn_2(&chconf.scalar_type, &chconf.shape, query.agg_kind()); + let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?; let empty = ChannelEvents::Events(empty); let empty = items::sitem_data(empty); let rawquery = PlainEventsQuery::new( @@ -35,7 +33,7 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu ); let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: - netpod::log::info!("timebinned_json with empty item {empty:?}"); + info!("timebinned_json with empty item {empty:?}"); let stream = items_2::merger::Merger::new(inps, 128); let stream = stream::iter([empty]).chain(stream); let stream = Box::pin(stream);