From a2e17848bac29cb1fef86c2119e03856bf24dacb Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 30 Mar 2023 17:04:11 +0200 Subject: [PATCH] WIP --- daqbuffer/src/bin/daqbuffer.rs | 7 +- dbconn/src/channelconfig.rs | 4 +- dbconn/src/query.rs | 53 ++- disk/src/aggtest.rs | 18 +- disk/src/channelconfig.rs | 11 +- disk/src/dataopen.rs | 16 +- disk/src/decode.rs | 9 +- disk/src/disk.rs | 32 +- disk/src/eventblobs.rs | 29 +- disk/src/eventchunker.rs | 603 +++++++++++++++++++++++++++++++++ disk/src/gen.rs | 27 +- disk/src/paths.rs | 18 +- disk/src/raw/conn.rs | 51 ++- dq/src/bin/dq.rs | 8 +- httpret/src/api1.rs | 2 +- httpret/src/api4/binned.rs | 2 +- httpret/src/api4/events.rs | 4 +- httpret/src/channel_status.rs | 14 +- httpret/src/channelconfig.rs | 7 +- items_2/src/eventsdim0.rs | 3 +- items_2/src/merger.rs | 33 +- netpod/src/netpod.rs | 30 +- nodenet/src/channelconfig.rs | 16 +- nodenet/src/conn.rs | 75 ++-- nodenet/src/scylla.rs | 38 +-- query/src/transform.rs | 32 +- streams/src/eventchunker.rs | 603 --------------------------------- streams/src/plaineventsjson.rs | 6 +- 28 files changed, 924 insertions(+), 827 deletions(-) diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 7d1fcf3..4a236c3 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -5,6 +5,8 @@ use clap::Parser; use daqbuffer::cli::ClientType; use daqbuffer::cli::Opts; use daqbuffer::cli::SubCmd; +use disk::AggQuerySingleChannel; +use disk::SfDbChConf; use err::Error; use netpod::log::*; use netpod::query::CacheUsage; @@ -132,14 +134,13 @@ fn simple_fetch() { use netpod::timeunits::*; use netpod::ByteOrder; use netpod::Channel; - use netpod::ChannelConfig; use netpod::ScalarType; use netpod::Shape; taskrun::run(async { let _rh = daqbufp2::nodes::require_test_hosts_running()?; let t1 = chrono::Utc::now(); - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { + let query = AggQuerySingleChannel { + channel_config: SfDbChConf { channel: Channel { backend: "sf-databuffer".into(), name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index ad33277..d05e4f2 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -45,7 +45,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &Channel, ncc: &NodeConfig let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; let ret = ChConf { backend, - series, + series: Some(series), name, scalar_type, shape, @@ -78,7 +78,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &Channel, ncc: &NodeConfig let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; let ret = ChConf { backend, - series, + series: Some(series), name, scalar_type, shape, diff --git a/dbconn/src/query.rs b/dbconn/src/query.rs index 369949d..441e8ef 100644 --- a/dbconn/src/query.rs +++ b/dbconn/src/query.rs @@ -1,31 +1,50 @@ use crate::create_connection; use crate::ErrConv; use err::Error; +use netpod::log::*; use netpod::Channel; use netpod::NodeConfigCached; // For sf-databuffer backend, given a Channel, try to complete the information if only id is given. pub async fn sf_databuffer_fetch_channel_by_series(channel: Channel, ncc: &NodeConfigCached) -> Result { + info!("sf_databuffer_fetch_channel_by_series"); // TODO should not be needed at some point. if channel.backend().is_empty() || channel.name().is_empty() { - let series = channel - .series() - .ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64; - let pgcon = create_connection(&ncc.node_config.cluster.database).await?; - let mut rows = pgcon - .query("select name from channels where rowid = $1", &[&series]) - .await - .err_conv()?; - if let Some(row) = rows.pop() { - let name: String = row.get(0); - let channel = Channel { - series: channel.series, - backend: ncc.node_config.cluster.backend.clone(), - name, - }; - Ok(channel) + if let Some(series) = channel.series() { + if series < 1 { + error!("sf_databuffer_fetch_channel_by_series bad input: {channel:?}"); + Err(Error::with_msg_no_trace(format!( + "sf_databuffer_fetch_channel_by_series bad input: {channel:?}" + ))) + } else { + info!("sf_databuffer_fetch_channel_by_series do the lookup"); + let series = channel + .series() + .ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64; + let pgcon = create_connection(&ncc.node_config.cluster.database).await?; + let mut rows = pgcon + .query("select name from channels where rowid = $1", &[&series]) + .await + .err_conv()?; + if let Some(row) = rows.pop() { + info!("sf_databuffer_fetch_channel_by_series got a row {row:?}"); + let name: String = row.get(0); + let channel = Channel { + series: channel.series, + backend: ncc.node_config.cluster.backend.clone(), + name, + }; + info!("sf_databuffer_fetch_channel_by_series return {channel:?}"); + Ok(channel) + } else { + info!("sf_databuffer_fetch_channel_by_series nothing found"); + Err(Error::with_msg_no_trace("can not find series")) + } + } } else { - Err(Error::with_msg_no_trace("can not find series")) + Err(Error::with_msg_no_trace(format!( + "sf_databuffer_fetch_channel_by_series bad input: {channel:?}" + ))) } } else { Ok(channel) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 7220b6d..3b5e7a5 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,17 +1,19 @@ use crate::eventblobs::EventChunkerMultifile; +use crate::eventchunker::EventChunkerConf; +use crate::AggQuerySingleChannel; +use crate::SfDbChConf; use netpod::range::evrange::NanoRange; use netpod::test_data_base_path_databuffer; use netpod::timeunits::*; use netpod::ByteOrder; use netpod::ByteSize; use netpod::Channel; -use netpod::ChannelConfig; +use netpod::DiskIoTune; use netpod::Node; use netpod::ScalarType; use netpod::SfDatabuffer; use netpod::Shape; use netpod::TsNano; -use streams::eventchunker::EventChunkerConf; pub fn make_test_node(id: u32) -> Node { Node { @@ -43,8 +45,8 @@ fn agg_x_dim_0() { async fn agg_x_dim_0_inner() { let node = make_test_node(0); - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { + let query = AggQuerySingleChannel { + channel_config: SfDbChConf { channel: Channel { backend: "sf-databuffer".into(), name: "S10BC01-DBAM070:EOM1_T1".into(), @@ -68,7 +70,7 @@ async fn agg_x_dim_0_inner() { let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO let upstream already provide DiskIoTune: - let mut disk_io_tune = netpod::DiskIoTune::default_for_testing(); + let mut disk_io_tune = DiskIoTune::default_for_testing(); disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = EventChunkerMultifile::new( range.clone(), @@ -100,8 +102,8 @@ async fn agg_x_dim_1_inner() { // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* // S10BC01-DBAM070:BAM_CH1_NORM let node = make_test_node(0); - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { + let query = AggQuerySingleChannel { + channel_config: SfDbChConf { channel: Channel { backend: "ks".into(), name: "wave1".into(), @@ -125,7 +127,7 @@ async fn agg_x_dim_1_inner() { let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO let upstream already provide DiskIoTune: - let mut disk_io_tune = netpod::DiskIoTune::default_for_testing(); + let mut disk_io_tune = DiskIoTune::default_for_testing(); disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = super::eventblobs::EventChunkerMultifile::new( range.clone(), diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 88059ad..3514182 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,18 +1,15 @@ use err::Error; use netpod::range::evrange::NanoRange; use netpod::Channel; -use netpod::ChannelConfig; use netpod::NodeConfigCached; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::ChannelConfigs; use parse::channelconfig::MatchingConfigEntry; -pub async fn config( - range: NanoRange, - channel: Channel, - node_config: &NodeConfigCached, -) -> Result { +use crate::SfDbChConf; + +pub async fn config(range: NanoRange, channel: Channel, node_config: &NodeConfigCached) -> Result { let channel_configs = read_local_config(channel.clone(), node_config.node.clone()).await?; let entry_res = match extract_matching_config_entry(&range, &channel_configs) { Ok(k) => k, @@ -37,7 +34,7 @@ pub async fn config( Ok(k) => k, Err(e) => return Err(e)?, }; - let channel_config = ChannelConfig { + let channel_config = SfDbChConf { channel: channel.clone(), keyspace: entry.ks as u8, time_bin_size: entry.bs.clone(), diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 74d8f42..cecdf0e 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -1,3 +1,5 @@ +use crate::SfDbChConf; + use super::paths; use bytes::BytesMut; use err::ErrStr; @@ -5,7 +7,6 @@ use err::Error; use futures_util::StreamExt; use netpod::log::*; use netpod::range::evrange::NanoRange; -use netpod::ChannelConfig; use netpod::Node; use netpod::TsNano; use std::fmt; @@ -207,7 +208,7 @@ impl fmt::Debug for OpenedFile { pub fn open_files( range: &NanoRange, - channel_config: &ChannelConfig, + channel_config: &SfDbChConf, node: Node, ) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); @@ -236,7 +237,7 @@ pub fn open_files( async fn open_files_inner( chtx: &async_channel::Sender>, range: &NanoRange, - channel_config: &ChannelConfig, + channel_config: &SfDbChConf, node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); @@ -276,7 +277,7 @@ Expanded to one event before and after the requested range, if exists. */ pub fn open_expanded_files( range: &NanoRange, - channel_config: &ChannelConfig, + channel_config: &SfDbChConf, node: Node, ) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); @@ -297,7 +298,7 @@ pub fn open_expanded_files( chrx } -async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result, Error> { +async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result, Error> { let mut timebins = Vec::new(); let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?; match tokio::fs::read_dir(&p0).await { @@ -333,7 +334,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result>, range: &NanoRange, - channel_config: &ChannelConfig, + channel_config: &SfDbChConf, node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); @@ -414,7 +415,6 @@ mod test { use netpod::range::evrange::NanoRange; use netpod::test_data_base_path_databuffer; use netpod::timeunits::*; - use netpod::ChannelConfig; use std::path::PathBuf; use tokio::fs::OpenOptions; @@ -826,7 +826,7 @@ mod test { series: None, }; // TODO read config from disk? Or expose the config from data generator? - let channel_config = ChannelConfig { + let channel_config = SfDbChConf { channel: chn, keyspace: 2, time_bin_size: TsNano(DAY), diff --git a/disk/src/decode.rs b/disk/src/decode.rs index f9f5231..b047091 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -11,7 +11,6 @@ use items_0::WithLen; use items_2::eventfull::EventFull; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; -#[allow(unused)] use netpod::log::*; use netpod::AggKind; use netpod::ScalarType; @@ -25,13 +24,17 @@ use std::task::Poll; pub trait Endianness: Send + Unpin { fn is_big() -> bool; } + pub struct LittleEndian {} + pub struct BigEndian {} + impl Endianness for LittleEndian { fn is_big() -> bool { false } } + impl Endianness for BigEndian { fn is_big() -> bool { true @@ -369,7 +372,7 @@ impl EventsDynStream { fn handle_stream_item( &mut self, item: StreamItem>, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let ret = match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { @@ -396,7 +399,7 @@ impl EventsDynStream { } impl Stream for EventsDynStream { - type Item = Sitemty>; + type Item = Sitemty>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 1095f45..bdd9db8 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -26,11 +26,16 @@ use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryFutureExt; use netpod::log::*; -use netpod::ChannelConfig; +use netpod::ByteOrder; +use netpod::Channel; use netpod::DiskIoTune; use netpod::Node; use netpod::ReadSys; +use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; +use serde::Deserialize; +use serde::Serialize; use std::collections::VecDeque; use std::future::Future; use std::io::SeekFrom; @@ -55,8 +60,29 @@ use tokio::io::AsyncSeekExt; use tokio::io::ReadBuf; use tokio::sync::mpsc; +// TODO move to databuffer-specific crate +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SfDbChConf { + pub channel: Channel, + pub keyspace: u8, + pub time_bin_size: TsNano, + pub scalar_type: ScalarType, + pub compression: bool, + pub shape: Shape, + pub array: bool, + pub byte_order: ByteOrder, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AggQuerySingleChannel { + pub channel_config: SfDbChConf, + pub timebin: u32, + pub tb_file_count: u32, + pub buffer_size: u32, +} + // TODO transform this into a self-test or remove. -pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result { +pub async fn read_test_1(query: &AggQuerySingleChannel, node: Node) -> Result { let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node); debug!("try path: {:?}", path); let fin = OpenOptions::new().read(true).open(path).await?; @@ -744,7 +770,7 @@ trait ChannelConfigExt { fn dtflags(&self) -> u8; } -impl ChannelConfigExt for ChannelConfig { +impl ChannelConfigExt for SfDbChConf { fn dtflags(&self) -> u8 { let mut ret = 0; if self.compression { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index f618f49..9a092e3 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,6 +1,9 @@ use crate::dataopen::open_expanded_files; use crate::dataopen::open_files; use crate::dataopen::OpenedFileSet; +use crate::eventchunker::EventChunker; +use crate::eventchunker::EventChunkerConf; +use crate::SfDbChConf; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -14,14 +17,12 @@ use items_2::merger::Merger; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; -use netpod::ChannelConfig; use netpod::DiskIoTune; use netpod::Node; +use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use streams::eventchunker::EventChunker; -use streams::eventchunker::EventChunkerConf; use streams::rangefilter2::RangeFilter2; pub trait InputTraits: Stream> {} @@ -29,7 +30,7 @@ pub trait InputTraits: Stream> {} impl InputTraits for T where T: Stream> {} pub struct EventChunkerMultifile { - channel_config: ChannelConfig, + channel_config: SfDbChConf, file_chan: async_channel::Receiver>, evs: Option>>, disk_io_tune: DiskIoTune, @@ -44,6 +45,7 @@ pub struct EventChunkerMultifile { emit_count: usize, do_emit_err_after: Option, range_final: bool, + log_queue: VecDeque, done: bool, done_emit_range_final: bool, complete: bool, @@ -52,7 +54,7 @@ pub struct EventChunkerMultifile { impl EventChunkerMultifile { pub fn new( range: NanoRange, - channel_config: ChannelConfig, + channel_config: SfDbChConf, node: Node, node_ix: usize, disk_io_tune: DiskIoTune, @@ -83,6 +85,7 @@ impl EventChunkerMultifile { emit_count: 0, do_emit_err_after: None, range_final: false, + log_queue: VecDeque::new(), done: false, done_emit_range_final: false, complete: false, @@ -98,7 +101,9 @@ impl Stream for EventChunkerMultifile { let _spg = span1.enter(); use Poll::*; 'outer: loop { - break if self.complete { + break if let Some(item) = self.log_queue.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else if self.complete { panic!("EventChunkerMultifile poll_next on complete"); } else if self.done_emit_range_final { self.complete = true; @@ -122,6 +127,12 @@ impl Stream for EventChunkerMultifile { if min <= self.max_ts { let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min); error!("{}", msg); + let item = LogItem { + node_ix: self.node_ix as _, + level: Level::INFO, + msg, + }; + self.log_queue.push_back(item); } self.max_ts = max; if let Some(after) = self.do_emit_err_after { @@ -262,6 +273,8 @@ impl Stream for EventChunkerMultifile { #[cfg(test)] mod test { use crate::eventblobs::EventChunkerMultifile; + use crate::eventchunker::EventChunkerConf; + use crate::SfDbChConf; use err::Error; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; @@ -272,10 +285,8 @@ mod test { use netpod::timeunits::DAY; use netpod::timeunits::MS; use netpod::ByteSize; - use netpod::ChannelConfig; use netpod::DiskIoTune; use netpod::TsNano; - use streams::eventchunker::EventChunkerConf; use streams::rangefilter2::RangeFilter2; fn read_expanded_for_range(range: NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { @@ -285,7 +296,7 @@ mod test { series: None, }; // TODO read config from disk. - let channel_config = ChannelConfig { + let channel_config = SfDbChConf { channel: chn, keyspace: 2, time_bin_size: TsNano(DAY), diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 8b13789..a5b0755 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1 +1,604 @@ +use crate::SfDbChConf; +use bitshuffle::bitshuffle_decompress; +use bytes::Buf; +use bytes::BytesMut; +use err::Error; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StatsItem; +use items_0::streamitem::StreamItem; +use items_0::Empty; +use items_0::WithLen; +use items_2::eventfull::EventFull; +use netpod::histo::HistoLog2; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::timeunits::SEC; +use netpod::ByteSize; +use netpod::EventDataReadStats; +use netpod::ScalarType; +use netpod::Shape; +use parse::channelconfig::CompressionMethod; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; +use streams::dtflags::*; +use streams::filechunkread::FileChunkRead; +use streams::needminbuffer::NeedMinBuffer; +pub struct EventChunker { + inp: NeedMinBuffer, + state: DataFileState, + need_min: u32, + channel_config: SfDbChConf, + errored: bool, + completed: bool, + range: NanoRange, + stats_conf: EventChunkerConf, + seen_beyond_range: bool, + sent_beyond_range: bool, + data_emit_complete: bool, + final_stats_sent: bool, + parsed_bytes: u64, + dbg_path: PathBuf, + max_ts: u64, + expand: bool, + do_decompress: bool, + decomp_dt_histo: HistoLog2, + item_len_emit_histo: HistoLog2, + seen_before_range_count: usize, + seen_after_range_count: usize, + unordered_warn_count: usize, + repeated_ts_warn_count: usize, +} + +impl Drop for EventChunker { + fn drop(&mut self) { + // TODO collect somewhere + debug!( + "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", + self.decomp_dt_histo, self.item_len_emit_histo + ); + } +} + +enum DataFileState { + FileHeader, + Event, +} + +struct ParseResult { + events: EventFull, + parsed_bytes: u64, +} + +#[derive(Clone, Debug)] +pub struct EventChunkerConf { + pub disk_stats_every: ByteSize, +} + +impl EventChunkerConf { + pub fn new(disk_stats_every: ByteSize) -> Self { + Self { disk_stats_every } + } +} + +impl EventChunker { + // TODO `expand` flag usage + pub fn from_start( + inp: Pin> + Send>>, + channel_config: SfDbChConf, + range: NanoRange, + stats_conf: EventChunkerConf, + dbg_path: PathBuf, + expand: bool, + do_decompress: bool, + ) -> Self { + info!("EventChunker::{} do_decompress {}", "from_start", do_decompress); + let mut inp = NeedMinBuffer::new(inp); + inp.set_need_min(6); + Self { + inp, + state: DataFileState::FileHeader, + need_min: 6, + channel_config, + errored: false, + completed: false, + range, + stats_conf, + seen_beyond_range: false, + sent_beyond_range: false, + data_emit_complete: false, + final_stats_sent: false, + parsed_bytes: 0, + dbg_path, + max_ts: 0, + expand, + do_decompress, + decomp_dt_histo: HistoLog2::new(8), + item_len_emit_histo: HistoLog2::new(0), + seen_before_range_count: 0, + seen_after_range_count: 0, + unordered_warn_count: 0, + repeated_ts_warn_count: 0, + } + } + + // TODO `expand` flag usage + pub fn from_event_boundary( + inp: Pin> + Send>>, + channel_config: SfDbChConf, + range: NanoRange, + stats_conf: EventChunkerConf, + dbg_path: PathBuf, + expand: bool, + do_decompress: bool, + ) -> Self { + info!( + "EventChunker::{} do_decompress {}", + "from_event_boundary", do_decompress + ); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); + ret.state = DataFileState::Event; + ret.need_min = 4; + ret.inp.set_need_min(4); + ret + } + + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) + } + + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + let mut ret = EventFull::empty(); + let mut parsed_bytes = 0; + use byteorder::{ReadBytesExt, BE}; + loop { + if (buf.len() as u32) < self.need_min { + break; + } + match self.state { + DataFileState::FileHeader => { + if buf.len() < 6 { + Err(Error::with_msg("need min 6 for FileHeader"))?; + } + let mut sl = std::io::Cursor::new(buf.as_ref()); + let fver = sl.read_i16::().unwrap(); + if fver != 0 { + Err(Error::with_msg("unexpected data file version"))?; + } + let len = sl.read_i32::().unwrap(); + if len <= 0 || len >= 128 { + Err(Error::with_msg("large channel header len"))?; + } + let totlen = len as usize + 2; + if buf.len() < totlen { + self.need_min = totlen as u32; + break; + } else { + sl.advance(len as usize - 8); + let len2 = sl.read_i32::().unwrap(); + if len != len2 { + Err(Error::with_msg("channel header len mismatch"))?; + } + String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; + self.state = DataFileState::Event; + self.need_min = 4; + buf.advance(totlen); + parsed_bytes += totlen as u64; + } + } + DataFileState::Event => { + let p0 = 0; + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len = sl.read_i32::().unwrap(); + if len < 20 || len > 1024 * 1024 * 20 { + Err(Error::with_msg("unexpected large event chunk"))?; + } + let len = len as u32; + if (buf.len() as u32) < len { + self.need_min = len as u32; + break; + } else { + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len1b = sl.read_i32::().unwrap(); + assert!(len == len1b as u32); + let _ttl = sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap() as u64; + let pulse = sl.read_i64::().unwrap() as u64; + if ts == self.max_ts { + if self.repeated_ts_warn_count < 20 { + let msg = format!( + "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", + self.repeated_ts_warn_count, + ts / SEC, + ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + self.repeated_ts_warn_count += 1; + } + } + if ts < self.max_ts { + if self.unordered_warn_count < 20 { + let msg = format!( + "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", + self.unordered_warn_count, + ts / SEC, + ts % SEC, + self.max_ts / SEC, + self.max_ts % SEC, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + self.unordered_warn_count += 1; + let e = Error::with_public_msg_no_trace(msg); + return Err(e); + } + } + self.max_ts = ts; + if ts >= self.range.end { + self.seen_after_range_count += 1; + if !self.expand || self.seen_after_range_count >= 2 { + self.seen_beyond_range = true; + self.data_emit_complete = true; + break; + } + } + if ts < self.range.beg { + self.seen_before_range_count += 1; + if self.seen_before_range_count > 1 { + let msg = format!( + "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", + ts / SEC, + ts % SEC, + self.range.beg / SEC, + self.range.beg % SEC, + self.range.end / SEC, + self.range.end % SEC, + pulse, + self.channel_config.shape, + self.dbg_path + ); + warn!("{}", msg); + let e = Error::with_public_msg(msg); + Err(e)?; + } + } + let _ioc_ts = sl.read_i64::().unwrap(); + let status = sl.read_i8().unwrap(); + let severity = sl.read_i8().unwrap(); + let optional = sl.read_i32::().unwrap(); + if status != 0 { + Err(Error::with_msg(format!("status != 0: {}", status)))?; + } + if severity != 0 { + Err(Error::with_msg(format!("severity != 0: {}", severity)))?; + } + if optional != -1 { + Err(Error::with_msg(format!("optional != -1: {}", optional)))?; + } + let type_flags = sl.read_u8().unwrap(); + let type_index = sl.read_u8().unwrap(); + if type_index > 13 { + Err(Error::with_msg(format!("type_index: {}", type_index)))?; + } + let scalar_type = ScalarType::from_dtype_index(type_index)?; + let is_compressed = type_flags & COMPRESSION != 0; + let is_array = type_flags & ARRAY != 0; + let is_big_endian = type_flags & BIG_ENDIAN != 0; + let is_shaped = type_flags & SHAPE != 0; + if let Shape::Wave(_) = self.channel_config.shape { + if !is_array { + Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; + } + } + let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; + let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; + assert!(compression_method <= 0); + assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); + let mut shape_lens = [0, 0, 0, 0]; + for i1 in 0..shape_dim { + shape_lens[i1 as usize] = sl.read_u32::().unwrap(); + } + let shape_this = { + if is_shaped { + if shape_dim == 1 { + Shape::Wave(shape_lens[0]) + } else if shape_dim == 2 { + Shape::Image(shape_lens[0], shape_lens[1]) + } else { + err::todoval() + } + } else { + Shape::Scalar + } + }; + let comp_this = if is_compressed { + if compression_method == 0 { + Some(CompressionMethod::BitshuffleLZ4) + } else { + err::todoval() + } + } else { + None + }; + let p1 = sl.position(); + let k1 = len as u64 - (p1 - p0) - 4; + if is_compressed { + //debug!("event ts {} is_compressed {}", ts, is_compressed); + let value_bytes = sl.read_u64::().unwrap(); + let block_size = sl.read_u32::().unwrap(); + //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); + match self.channel_config.shape { + Shape::Scalar => { + assert!(value_bytes < 1024 * 1); + } + Shape::Wave(_) => { + assert!(value_bytes < 1024 * 64); + } + Shape::Image(_, _) => { + assert!(value_bytes < 1024 * 1024 * 20); + } + } + assert!(block_size <= 1024 * 32); + let type_size = scalar_type.bytes() as u32; + let ele_count = value_bytes / type_size as u64; + let ele_size = type_size; + match self.channel_config.shape { + Shape::Scalar => { + if is_array { + Err(Error::with_msg(format!( + "ChannelConfig expects Scalar but we find event is_array" + )))?; + } + } + Shape::Wave(dim1count) => { + if dim1count != ele_count as u32 { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } + Shape::Image(n1, n2) => { + let nt = n1 as usize * n2 as usize; + if nt != ele_count as usize { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } + } + let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)]; + let decomp = { + if self.do_decompress { + assert!(data.len() > 12); + let ts1 = Instant::now(); + let decomp_bytes = (type_size * ele_count as u32) as usize; + let mut decomp = vec![0; decomp_bytes]; + // TODO limit the buf slice range + match bitshuffle_decompress( + &data[12..], + &mut decomp, + ele_count as usize, + ele_size as usize, + 0, + ) { + Ok(c1) => { + assert!(c1 as u64 + 12 == k1); + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + // TODO analyze the histo + self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); + Some(decomp) + } + Err(e) => { + return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; + } + } + } else { + None + } + }; + ret.add_event( + ts, + pulse, + Some(data.to_vec()), + decomp, + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + shape_this, + comp_this, + ); + } else { + if len < p1 as u32 + 4 { + let msg = format!("uncomp len: {} p1: {}", len, p1); + Err(Error::with_msg(msg))?; + } + let vlen = len - p1 as u32 - 4; + let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize]; + ret.add_event( + ts, + pulse, + Some(data.to_vec()), + Some(data.to_vec()), + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + shape_this, + comp_this, + ); + } + buf.advance(len as usize); + parsed_bytes += len as u64; + self.need_min = 4; + } + } + } + } + Ok(ParseResult { + events: ret, + parsed_bytes, + }) + } +} + +impl Stream for EventChunker { + type Item = Result>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("EventChunker poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); + Ready(Some(Ok(ret))) + } else if self.sent_beyond_range { + self.completed = true; + Ready(None) + } else if self.final_stats_sent { + self.sent_beyond_range = true; + trace!("sent_beyond_range"); + if self.seen_beyond_range { + trace!("sent_beyond_range RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + trace!("sent_beyond_range non-complete"); + continue 'outer; + } + } else if self.data_emit_complete { + let item = EventDataReadStats { + parsed_bytes: self.parsed_bytes, + }; + self.parsed_bytes = 0; + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); + self.final_stats_sent = true; + Ready(Some(Ok(ret))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut fcr))) => { + if false { + // TODO collect for stats: + info!( + "file read bytes {} ms {}", + fcr.buf().len(), + fcr.duration().as_millis() + ); + } + let r = self.parse_buf(fcr.buf_mut()); + match r { + Ok(res) => { + self.parsed_bytes += res.parsed_bytes; + if fcr.buf().len() > 0 { + // TODO gather stats about this: + self.inp.put_back(fcr); + } + match self.channel_config.shape { + Shape::Scalar => { + if self.need_min > 1024 * 8 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Wave(_) => { + if self.need_min > 1024 * 32 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Image(_, _) => { + if self.need_min > 1024 * 1024 * 20 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + } + let x = self.need_min; + self.inp.set_need_min(x); + if false { + info!( + "EventChunker emits {} events tss {:?}", + res.events.len(), + res.events.tss + ); + }; + self.item_len_emit_histo.ingest(res.events.len() as u32); + let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); + Ready(Some(Ok(ret))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.data_emit_complete = true; + continue 'outer; + } + Pending => Pending, + } + }; + } + } +} + +#[cfg(test)] +mod test { + //use err::Error; + //use netpod::timeunits::*; + //use netpod::{ByteSize, Nanos}; + + /* + #[test] + fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + // TODO read config from disk. + let channel_config = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + byte_order: netpod::ByteOrder::big_endian(), + shape: netpod::Shape::Scalar, + array: false, + compression: false, + }; + let cluster = taskrun::test_cluster(); + let node = cluster.nodes[nodeix].clone(); + let buffer_size = 512; + let event_chunker_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + } + */ +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 5f718d9..03b4373 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -1,4 +1,5 @@ use crate::ChannelConfigExt; +use crate::SfDbChConf; use bitshuffle::bitshuffle_compress; use bytes::BufMut; use bytes::BytesMut; @@ -7,7 +8,6 @@ use netpod::log::*; use netpod::timeunits::*; use netpod::ByteOrder; use netpod::Channel; -use netpod::ChannelConfig; use netpod::GenVar; use netpod::Node; use netpod::ScalarType; @@ -31,7 +31,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }; { let chn = ChannelGenProps { - config: ChannelConfig { + config: SfDbChConf { channel: Channel { backend: backend.clone(), name: "scalar-i32-be".into(), @@ -50,7 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }; ensemble.channels.push(chn); let chn = ChannelGenProps { - config: ChannelConfig { + config: SfDbChConf { channel: Channel { backend: backend.clone(), name: "wave-f64-be-n21".into(), @@ -69,7 +69,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }; ensemble.channels.push(chn); let chn = ChannelGenProps { - config: ChannelConfig { + config: SfDbChConf { channel: Channel { backend: backend.clone(), name: "wave-u16-le-n77".into(), @@ -88,7 +88,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }; ensemble.channels.push(chn); let chn = ChannelGenProps { - config: ChannelConfig { + config: SfDbChConf { channel: Channel { backend: backend.clone(), name: "tw-scalar-i32-be".into(), @@ -107,7 +107,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }; ensemble.channels.push(chn); let chn = ChannelGenProps { - config: ChannelConfig { + config: SfDbChConf { channel: Channel { backend: backend.clone(), name: "const-regular-scalar-i32-be".into(), @@ -156,7 +156,7 @@ struct Ensemble { } pub struct ChannelGenProps { - config: ChannelConfig, + config: SfDbChConf, time_spacing: u64, gen_var: GenVar, } @@ -204,12 +204,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: & Ok(()) } -async fn gen_config( - config_path: &Path, - config: &ChannelConfig, - _node: &Node, - _ensemble: &Ensemble, -) -> Result<(), Error> { +async fn gen_config(config_path: &Path, config: &SfDbChConf, _node: &Node, _ensemble: &Ensemble) -> Result<(), Error> { let path = config_path.join("latest"); tokio::fs::create_dir_all(&path).await?; let path = path.join("00000_Config"); @@ -337,7 +332,7 @@ async fn gen_timebin( pulse: u64, ts_spacing: u64, channel_path: &Path, - config: &ChannelConfig, + config: &SfDbChConf, split: u32, _node: &Node, ensemble: &Ensemble, @@ -406,7 +401,7 @@ async fn gen_timebin( Ok(ret) } -async fn gen_datafile_header(file: &mut CountedFile, config: &ChannelConfig) -> Result<(), Error> { +async fn gen_datafile_header(file: &mut CountedFile, config: &SfDbChConf) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024); let cnenc = config.channel.name.as_bytes(); let len1 = cnenc.len() + 8; @@ -424,7 +419,7 @@ async fn gen_event( evix: u64, ts: TsNano, pulse: u64, - config: &ChannelConfig, + config: &SfDbChConf, gen_var: &GenVar, ) -> Result<(), Error> { let ttl = 0xcafecafe; diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 68790c0..b3e9778 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -1,13 +1,13 @@ +use crate::SfDbChConf; use err::Error; use futures_util::StreamExt; use netpod::timeunits::MS; -use netpod::ChannelConfig; use netpod::Node; use netpod::TsNano; use std::path::PathBuf; // TODO remove/replace this -pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node: &Node) -> PathBuf { +pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf { node.sf_databuffer .as_ref() .unwrap() @@ -30,11 +30,7 @@ Return potential datafile paths for the given timebin. It says "potential datafile paths" because we don't open the file here yet and of course, files may vanish until then. Also, the timebin may actually not exist. */ -pub async fn datapaths_for_timebin( - timebin: u64, - config: &netpod::ChannelConfig, - node: &Node, -) -> Result, Error> { +pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Node) -> Result, Error> { let sfc = node.sf_databuffer.as_ref().unwrap(); let timebin_path = sfc .data_base_path @@ -84,7 +80,7 @@ pub async fn datapaths_for_timebin( Ok(ret) } -pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result { +pub fn channel_timebins_dir_path(channel_config: &SfDbChConf, node: &Node) -> Result { let sfc = node.sf_databuffer.as_ref().unwrap(); let ret = sfc .data_base_path @@ -94,20 +90,20 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Ok(ret) } -pub fn data_dir_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { +pub fn data_dir_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { let ret = channel_timebins_dir_path(channel_config, node)? .join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns())) .join(format!("{:010}", split)); Ok(ret) } -pub fn data_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { +pub fn data_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0); let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) } -pub fn index_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { +pub fn index_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0); let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index ed906c9..776e764 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,4 +1,6 @@ use crate::eventblobs::EventChunkerMultifile; +use crate::eventchunker::EventChunkerConf; +use crate::SfDbChConf; use err::Error; use futures_util::stream; use futures_util::Stream; @@ -12,25 +14,24 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AggKind; use netpod::ByteSize; +use netpod::ChConf; use netpod::Channel; use netpod::DiskIoTune; use netpod::NodeConfigCached; -use netpod::ScalarType; -use netpod::Shape; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::ConfigEntry; use parse::channelconfig::MatchingConfigEntry; use query::api4::events::PlainEventsQuery; use std::pin::Pin; -use streams::eventchunker::EventChunkerConf; fn make_num_pipeline_stream_evs( - scalar_type: ScalarType, - shape: Shape, + chconf: ChConf, agg_kind: AggKind, event_blobs: EventChunkerMultifile, ) -> Pin> + Send>> { + let scalar_type = chconf.scalar_type.clone(); + let shape = chconf.shape.clone(); let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) { Ok(k) => k, Err(e) => { @@ -55,18 +56,33 @@ fn make_num_pipeline_stream_evs( pub async fn make_event_pipe( evq: &PlainEventsQuery, - node_config: &NodeConfigCached, + chconf: ChConf, + ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { info!("---------- disk::raw::conn::make_event_pipe"); if false { - match dbconn::channel_exists(&evq.channel(), &node_config).await { + match dbconn::channel_exists(&evq.channel(), &ncc).await { Ok(_) => (), Err(e) => return Err(e)?, } } + let chn = evq.channel().clone(); + let chn = if chn.name().is_empty() { + if let Some(series) = chn.series() { + if series < 1 { + error!("BAD QUERY: {evq:?}"); + return Err(Error::with_msg_no_trace(format!("BAD QUERY: {evq:?}"))); + } else { + dbconn::query::sf_databuffer_fetch_channel_by_series(chn, ncc).await? + } + } else { + chn + } + } else { + chn + }; let range = evq.range().clone(); - let channel_config = - crate::channelconfig::config(evq.range().try_into()?, evq.channel().clone(), node_config).await; + let channel_config = crate::channelconfig::config(evq.range().try_into()?, chn, ncc).await; let channel_config = match channel_config { Ok(x) => x, Err(e) => { @@ -80,14 +96,14 @@ pub async fn make_event_pipe( } } }; - trace!( + info!( "make_event_pipe need_expand {need_expand} {evq:?}", need_expand = evq.one_before_range() ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO should not need this for correctness. // Should limit based on return size and latency. - let out_max_len = if node_config.node_config.cluster.is_central_storage { + let out_max_len = if ncc.node_config.cluster.is_central_storage { 128 } else { 128 @@ -95,18 +111,16 @@ pub async fn make_event_pipe( let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, channel_config.clone(), - node_config.node.clone(), - node_config.ix, + ncc.node.clone(), + ncc.ix, DiskIoTune::default(), event_chunker_conf, evq.one_before_range(), true, out_max_len, ); - let scalar_type = channel_config.scalar_type.clone(); - let shape = channel_config.shape.clone(); error!("TODO replace AggKind in the called code"); - let pipe = make_num_pipeline_stream_evs(scalar_type, shape.clone(), AggKind::TimeWeightedScalar, event_blobs); + let pipe = make_num_pipeline_stream_evs(chconf, AggKind::TimeWeightedScalar, event_blobs); Ok(pipe) } @@ -115,6 +129,7 @@ pub async fn get_applicable_entry( channel: Channel, node_config: &NodeConfigCached, ) -> Result { + info!("---------- disk::raw::conn::get_applicable_entry"); let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, @@ -156,7 +171,7 @@ pub fn make_local_event_blobs_stream( Ok(k) => k, Err(e) => return Err(e)?, }; - let channel_config = netpod::ChannelConfig { + let channel_config = SfDbChConf { channel, keyspace: entry.ks as u8, time_bin_size: entry.bs.clone(), @@ -202,7 +217,7 @@ pub fn make_remote_event_blobs_stream( Ok(k) => k, Err(e) => return Err(e)?, }; - let channel_config = netpod::ChannelConfig { + let channel_config = SfDbChConf { channel, keyspace: entry.ks as u8, time_bin_size: entry.bs.clone(), diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 7f2f214..284fd9a 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -1,5 +1,8 @@ use clap::ArgAction; use clap::Parser; +use disk::eventchunker::EventChunker; +use disk::eventchunker::EventChunkerConf; +use disk::SfDbChConf; use err::Error; #[allow(unused)] use netpod::log::*; @@ -7,11 +10,8 @@ use netpod::range::evrange::NanoRange; use netpod::ByteOrder; use netpod::ByteSize; use netpod::Channel; -use netpod::ChannelConfig; use netpod::Shape; use std::path::PathBuf; -use streams::eventchunker::EventChunker; -use streams::eventchunker::EventChunkerConf; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -80,7 +80,7 @@ pub fn main() -> Result<(), Error> { let disk_io_tune = netpod::DiskIoTune::default(); let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune)); let ce = &config.entries[0]; - let channel_config = ChannelConfig { + let channel_config = SfDbChConf { channel: Channel { backend: String::new(), name: config.channel_name.clone(), diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 043241e..9acaf85 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -6,6 +6,7 @@ use crate::BodyStream; use crate::ReqCtx; use bytes::BufMut; use bytes::BytesMut; +use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use disk::raw::conn::make_local_event_blobs_stream; use futures_util::stream; @@ -59,7 +60,6 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; use std::time::Instant; -use streams::eventchunker::EventChunkerConf; use tracing_futures::Instrument; use url::Url; diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 5a948e9..6ee11e2 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -30,7 +30,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache let chconf = chconf_from_binned(&query, node_config).await?; // Update the series id since we don't require some unique identifier yet. let mut query = query; - query.set_series_id(chconf.series); + query.set_series_id(chconf.try_series()?); let query = query; // --- let span1 = span!( diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index d6ae523..9573cd8 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -78,7 +78,7 @@ async fn plain_events_binary( info!("plain_events_binary chconf_from_events_v1: {chconf:?}"); // Update the series id since we don't require some unique identifier yet. let mut query = query; - query.set_series_id(chconf.series); + query.set_series_id(chconf.try_series()?); let query = query; // --- let _ = query; @@ -103,7 +103,7 @@ async fn plain_events_json( info!("plain_events_json chconf_from_events_v1: {chconf:?}"); // Update the series id since we don't require some unique identifier yet. let mut query = query; - query.set_series_id(chconf.series); + query.set_series_id(chconf.try_series()?); let query = query; // --- //let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index c3bf227..1db7378 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -74,7 +74,8 @@ impl ConnectionStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let _scy = scyllaconn::create_scy_session(scyco).await?; - let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; + let chconf = + nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; let _series = chconf.series; let _do_one_before_range = true; let ret = Vec::new(); @@ -148,10 +149,15 @@ impl ChannelStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let chconf = nodenet::channelconfig::channel_config(q.range().clone(),q.channel().clone(), node_config).await?; + let chconf = + nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; let do_one_before_range = true; - let mut stream = - scyllaconn::status::StatusStreamScylla::new(chconf.series, q.range().clone(), do_one_before_range, scy); + let mut stream = scyllaconn::status::StatusStreamScylla::new( + chconf.try_series()?, + q.range().clone(), + do_one_before_range, + scy, + ); let mut ret = Vec::new(); while let Some(item) = stream.next().await { let item = item?; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index b675360..a68c991 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -42,10 +42,7 @@ pub async fn chconf_from_events_v1(q: &PlainEventsQuery, ncc: &NodeConfigCached) pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { let ret = ChConf { backend: q.channel().backend().into(), - series: q - .channel() - .series() - .expect("PreBinnedQuery is expected to contain the series id"), + series: q.channel().series().clone(), name: q.channel().name().into(), scalar_type: q.scalar_type().clone(), shape: q.shape().clone(), @@ -105,7 +102,7 @@ impl ChannelConfigHandler { let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; ChannelConfigResponse { channel: Channel { - series: Some(c.series), + series: c.series.clone(), backend: q.channel.backend().into(), name: c.name, }, diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 03c6522..6017b7a 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -751,8 +751,7 @@ impl TimeBinnable for EventsDim0 { impl TypeName for EventsDim0 { fn type_name(&self) -> String { let self_name = any::type_name::(); - let sty = any::type_name::(); - format!("EventsDim0<{sty}> aka {self_name}<{sty}>") + format!("{self_name}") } } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index a0ff972..f3b0c3e 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -1,9 +1,9 @@ -pub use crate::Error; - +use crate::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::container::ByteEstimate; use items_0::streamitem::sitem_data; +use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -20,6 +20,7 @@ use std::task::Context; use std::task::Poll; const OUT_MAX_BYTES: u64 = 1024 * 200; +const DO_DETECT_NON_MONO: bool = true; #[allow(unused)] macro_rules! trace2 { @@ -60,6 +61,8 @@ pub struct Merger { out_max_len: usize, range_complete: Vec, out_of_band_queue: VecDeque>, + log_queue: VecDeque, + dim0ix_max: u64, done_data: bool, done_buffered: bool, done_range_complete: bool, @@ -100,6 +103,8 @@ where out_max_len, range_complete: vec![false; n], out_of_band_queue: VecDeque::new(), + log_queue: VecDeque::new(), + dim0ix_max: 0, done_data: false, done_buffered: false, done_range_complete: false, @@ -144,6 +149,7 @@ where fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { use ControlFlow::*; trace4!("process"); + let mut log_items = Vec::new(); let mut tslows = [None, None]; for (i1, itemopt) in self.items.iter_mut().enumerate() { if let Some(item) = itemopt { @@ -168,12 +174,31 @@ where } } else { // the item seems empty. + // TODO count for stats. trace2!("empty item, something to do here?"); *itemopt = None; return Ok(Continue(())); } } } + if DO_DETECT_NON_MONO { + if let Some((i1, t1)) = tslows[0].as_ref() { + if *t1 <= self.dim0ix_max { + self.dim0ix_max = *t1; + let item = LogItem { + node_ix: *i1 as _, + level: Level::INFO, + msg: format!( + "dim0ix_max {} vs {} diff {}", + self.dim0ix_max, + t1, + self.dim0ix_max - t1 + ), + }; + log_items.push(item); + } + } + } trace4!("tslows {tslows:?}"); if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { @@ -367,7 +392,9 @@ where let _spg = span1.enter(); loop { trace3!("poll"); - break if self.poll_count == usize::MAX { + break if let Some(item) = self.log_queue.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else if self.poll_count == usize::MAX { self.done_range_complete = true; continue; } else if self.complete { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 57adfad..3496da7 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -62,14 +62,6 @@ impl CmpZero for u32 { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct AggQuerySingleChannel { - pub channel_config: ChannelConfig, - pub timebin: u32, - pub tb_file_count: u32, - pub buffer_size: u32, -} - pub struct BodyStream { //pub receiver: async_channel::Receiver>, pub inner: Box> + Send + Unpin>, @@ -766,19 +758,6 @@ pub enum GenVar { ConstRegular, } -// TODO move to databuffer-specific crate -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ChannelConfig { - pub channel: Channel, - pub keyspace: u8, - pub time_bin_size: TsNano, - pub scalar_type: ScalarType, - pub compression: bool, - pub shape: Shape, - pub array: bool, - pub byte_order: ByteOrder, -} - #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum ShapeOld { Scalar, @@ -2293,12 +2272,19 @@ pub struct ChannelInfo { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChConf { pub backend: String, - pub series: u64, + pub series: Option, pub name: String, pub scalar_type: ScalarType, pub shape: Shape, } +impl ChConf { + pub fn try_series(&self) -> Result { + self.series + .ok_or_else(|| Error::with_msg_no_trace("ChConf without SeriesId")) + } +} + pub fn f32_close(a: f32, b: f32) -> bool { if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) { true diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index a3f0b6e..3a48073 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -15,7 +15,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig let ret = if channel.name() == "scalar-i32-be" { let ret = ChConf { backend, - series: 1, + series: Some(1), name: channel.name().into(), scalar_type: ScalarType::I32, shape: Shape::Scalar, @@ -24,7 +24,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig } else if channel.name() == "wave-f64-be-n21" { let ret = ChConf { backend, - series: 2, + series: Some(2), name: channel.name().into(), scalar_type: ScalarType::F64, shape: Shape::Wave(21), @@ -33,7 +33,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig } else if channel.name() == "const-regular-scalar-i32-be" { let ret = ChConf { backend, - series: 3, + series: Some(3), name: channel.name().into(), scalar_type: ScalarType::I32, shape: Shape::Scalar, @@ -50,7 +50,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig let ret = if channel.name() == "inmem-d0-i32" { let ret = ChConf { backend, - series: 1, + series: Some(1), name: channel.name().into(), scalar_type: ScalarType::I32, shape: Shape::Scalar, @@ -69,12 +69,16 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig .map_err(Error::from)?; Ok(ret) } else if ncc.node.sf_databuffer.is_some() { + info!("channel_config BEFORE {channel:?}"); info!("try to get ChConf for sf-databuffer type backend"); + // TODO in the future we should not need this: let channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?; - let c1 = disk::channelconfig::config(range, channel, ncc).await?; + info!("channel_config AFTER {channel:?}"); + let c1 = disk::channelconfig::config(range, channel.clone(), ncc).await?; + info!("channel_config THEN {c1:?}"); let ret = ChConf { backend: c1.channel.backend, - series: 0, + series: channel.series, name: c1.channel.name, scalar_type: c1.scalar_type, shape: c1.shape, diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index e1f9bc7..912f430 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -14,9 +14,11 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; use items_2::frame::make_term_frame; +use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::AggKind; +use netpod::ChConf; use netpod::NodeConfigCached; use netpod::PerfOpts; use query::api4::events::PlainEventsQuery; @@ -24,6 +26,7 @@ use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncWriteExt; +use tokio::net::tcp::OwnedReadHalf; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; @@ -63,6 +66,7 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { async fn make_channel_events_stream( evq: PlainEventsQuery, + chconf: ChConf, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { info!("nodenet::conn::make_channel_events_stream"); @@ -100,7 +104,7 @@ async fn make_channel_events_stream( Ok(Box::pin(stream)) } } else if let Some(scyconf) = &node_config.node_config.cluster.scylla { - scylla_channel_event_stream(evq, scyconf, node_config).await + scylla_channel_event_stream(evq, chconf, scyconf, node_config).await } else if let Some(_) = &node_config.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); Err(e) @@ -108,17 +112,11 @@ async fn make_channel_events_stream( let e = Error::with_msg_no_trace("archapp not built"); Err(e) } else { - Ok(disk::raw::conn::make_event_pipe(&evq, node_config).await?) + Ok(disk::raw::conn::make_event_pipe(&evq, chconf, node_config).await?) } } -async fn events_conn_handler_inner_try( - stream: TcpStream, - addr: SocketAddr, - node_config: &NodeConfigCached, -) -> Result<(), ConnErr> { - let _ = addr; - let (netin, mut netout) = stream.into_split(); +async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { warn!("fix magic inmem_bufcap option"); let perf_opts = PerfOpts::default(); let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); @@ -136,19 +134,26 @@ async fn events_conn_handler_inner_try( debug!("ignored incoming frame {:?}", item); } Err(e) => { - return Err((e, netout).into()); + return Err(e); } } } - debug!("events_conn_handler input frames received"); + Ok(frames) +} + +async fn events_parse_input_query( + frames: Vec, + ncc: &NodeConfigCached, +) -> Result<(PlainEventsQuery, ChConf), Error> { if frames.len() != 1 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); - return Err((Error::with_msg("missing command frame"), netout).into()); + let e = Error::with_msg("missing command frame"); + return Err(e); } let query_frame = &frames[0]; if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { - return Err((Error::with_msg("query frame wrong type"), netout).into()); + return Err(Error::with_msg("query frame wrong type")); } // TODO this does not need all variants of Sitemty. let qitem = match decode_frame::>(query_frame) { @@ -156,32 +161,46 @@ async fn events_conn_handler_inner_try( Ok(k) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => k, - RangeCompletableItem::RangeComplete => { - return Err((Error::with_msg("bad query item"), netout).into()) - } + RangeCompletableItem::RangeComplete => return Err(Error::with_msg("bad query item")), }, - _ => return Err((Error::with_msg("bad query item"), netout).into()), + _ => return Err(Error::with_msg("bad query item")), }, - Err(e) => return Err((e, netout).into()), + Err(e) => return Err(e), }, - Err(e) => return Err((e, netout).into()), + Err(e) => return Err(e), }; let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, Err(e) => { - error!("json parse error: {:?}", e); - return Err((Error::with_msg("json parse error"), netout).into()); + let e = Error::with_msg_no_trace(format!("json parse error: {e}")); + error!("{e}"); + return Err(e); } }; info!("events_conn_handler_inner_try evq {:?}", evq); + let chconf = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), ncc) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + Ok((evq, chconf)) +} - if evq.channel().name() == "test-do-trigger-main-error" { - let e = Error::with_msg(format!("Test error private message.")) - .add_public_msg(format!("Test error PUBLIC message.")); - return Err((e, netout).into()); - } - +async fn events_conn_handler_inner_try( + stream: TcpStream, + addr: SocketAddr, + node_config: &NodeConfigCached, +) -> Result<(), ConnErr> { + let _ = addr; + let (netin, mut netout) = stream.into_split(); + let frames = match events_get_input_frames(netin).await { + Ok(x) => x, + Err(e) => return Err((e, netout).into()), + }; + debug!("events_conn_handler input frames received"); + let (evq, chconf) = match events_parse_input_query(frames, node_config).await { + Ok(x) => x, + Err(e) => return Err((e, netout).into()), + }; let mut stream: Pin> + Send>> = if evq.is_event_blobs() { // TODO support event blobs as transform match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { @@ -194,7 +213,7 @@ async fn events_conn_handler_inner_try( } } } else { - match make_channel_events_stream(evq.clone(), node_config).await { + match make_channel_events_stream(evq.clone(), chconf, node_config).await { Ok(stream) => { let stream = stream .map({ diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 0940359..857e417 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -8,6 +8,7 @@ use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; use netpod::log::*; +use netpod::ChConf; use netpod::NodeConfigCached; use netpod::ScyllaConfig; use query::api4::events::PlainEventsQuery; @@ -15,6 +16,7 @@ use std::pin::Pin; pub async fn scylla_channel_event_stream( evq: PlainEventsQuery, + chconf: ChConf, scyco: &ScyllaConfig, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { @@ -26,12 +28,12 @@ pub async fn scylla_channel_event_stream( .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let series = f.series; + let series = f.try_series()?; let scalar_type = f.scalar_type; let shape = f.shape; let do_test_stream_error = false; let with_values = evq.need_value_data(); - debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); + debug!("Make EventsStreamScylla for {series:?} {scalar_type:?} {shape:?}"); let stream = scyllaconn::events::EventsStreamScylla::new( series, evq.range().into(), @@ -43,38 +45,6 @@ pub async fn scylla_channel_event_stream( do_test_stream_error, ); let stream = stream - .map({ - let is_pulse_id_diff = evq.transform().is_pulse_id_diff(); - let mut pulse_last = None; - move |item| match item { - Ok(item) => { - let x = if is_pulse_id_diff { - let x = match item { - ChannelEvents::Events(item) => { - let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); - let mut item = items_2::eventsdim0::EventsDim0::empty(); - for (ts, pulse) in tss.into_iter().zip(pulses) { - let value = if let Some(last) = pulse_last { - pulse as i64 - last as i64 - } else { - 0 - }; - item.push(ts, pulse, value); - pulse_last = Some(pulse); - } - ChannelEvents::Events(Box::new(item)) - } - ChannelEvents::Status(x) => ChannelEvents::Status(x), - }; - x - } else { - item - }; - Ok(x) - } - Err(e) => Err(e), - } - }) .map(move |item| match &item { Ok(k) => match k { ChannelEvents::Events(k) => { diff --git a/query/src/transform.rs b/query/src/transform.rs index 0eb48c5..efd119c 100644 --- a/query/src/transform.rs +++ b/query/src/transform.rs @@ -177,11 +177,35 @@ impl FromUrl for TransformQuery { impl AppendToUrl for TransformQuery { fn append_to_url(&self, url: &mut Url) { - warn!("TODO AppendToUrl for Transform"); - let upre = Self::url_prefix(); let mut g = url.query_pairs_mut(); - if let Some(x) = &Some(123) { - g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x)); + if false { + let upre = Self::url_prefix(); + if let Some(x) = &Some(123) { + g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x)); + } + } + let key = "binningScheme"; + match &self.event { + EventTransformQuery::EventBlobsVerbatim => { + g.append_pair(key, &format!("{}", "eventBlobs")); + } + EventTransformQuery::EventBlobsUncompressed => { + // TODO + g.append_pair(key, &format!("{}", "eventBlobs")); + } + EventTransformQuery::ValueFull => { + g.append_pair(key, &format!("{}", "fullValue")); + } + EventTransformQuery::ArrayPick(_) => { + // TODO + g.append_pair(key, &format!("{}", "fullValue")); + } + EventTransformQuery::MinMaxAvgDev => { + g.append_pair(key, &format!("{}", "timeWeightedScalar")); + } + EventTransformQuery::PulseIdDiff => { + g.append_pair(key, &format!("{}", "pulseIdDiff")); + } } } } diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 5c7e0ed..8b13789 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -1,604 +1 @@ -use crate::filechunkread::FileChunkRead; -use crate::needminbuffer::NeedMinBuffer; -use bitshuffle::bitshuffle_decompress; -use bytes::Buf; -use bytes::BytesMut; -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::StatsItem; -use items_0::streamitem::StreamItem; -use items_0::Empty; -use items_0::WithLen; -use items_2::eventfull::EventFull; -use netpod::histo::HistoLog2; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::timeunits::SEC; -use netpod::ByteSize; -use netpod::ChannelConfig; -use netpod::EventDataReadStats; -use netpod::ScalarType; -use netpod::Shape; -use parse::channelconfig::CompressionMethod; -use std::path::PathBuf; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use std::time::Instant; -pub struct EventChunker { - inp: NeedMinBuffer, - state: DataFileState, - need_min: u32, - channel_config: ChannelConfig, - errored: bool, - completed: bool, - range: NanoRange, - stats_conf: EventChunkerConf, - seen_beyond_range: bool, - sent_beyond_range: bool, - data_emit_complete: bool, - final_stats_sent: bool, - parsed_bytes: u64, - dbg_path: PathBuf, - max_ts: u64, - expand: bool, - do_decompress: bool, - decomp_dt_histo: HistoLog2, - item_len_emit_histo: HistoLog2, - seen_before_range_count: usize, - seen_after_range_count: usize, - unordered_warn_count: usize, - repeated_ts_warn_count: usize, -} - -impl Drop for EventChunker { - fn drop(&mut self) { - // TODO collect somewhere - debug!( - "EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}", - self.decomp_dt_histo, self.item_len_emit_histo - ); - } -} - -enum DataFileState { - FileHeader, - Event, -} - -struct ParseResult { - events: EventFull, - parsed_bytes: u64, -} - -#[derive(Clone, Debug)] -pub struct EventChunkerConf { - pub disk_stats_every: ByteSize, -} - -impl EventChunkerConf { - pub fn new(disk_stats_every: ByteSize) -> Self { - Self { disk_stats_every } - } -} - -impl EventChunker { - // TODO `expand` flag usage - pub fn from_start( - inp: Pin> + Send>>, - channel_config: ChannelConfig, - range: NanoRange, - stats_conf: EventChunkerConf, - dbg_path: PathBuf, - expand: bool, - do_decompress: bool, - ) -> Self { - info!("EventChunker::{} do_decompress {}", "from_start", do_decompress); - let mut inp = NeedMinBuffer::new(inp); - inp.set_need_min(6); - Self { - inp, - state: DataFileState::FileHeader, - need_min: 6, - channel_config, - errored: false, - completed: false, - range, - stats_conf, - seen_beyond_range: false, - sent_beyond_range: false, - data_emit_complete: false, - final_stats_sent: false, - parsed_bytes: 0, - dbg_path, - max_ts: 0, - expand, - do_decompress, - decomp_dt_histo: HistoLog2::new(8), - item_len_emit_histo: HistoLog2::new(0), - seen_before_range_count: 0, - seen_after_range_count: 0, - unordered_warn_count: 0, - repeated_ts_warn_count: 0, - } - } - - // TODO `expand` flag usage - pub fn from_event_boundary( - inp: Pin> + Send>>, - channel_config: ChannelConfig, - range: NanoRange, - stats_conf: EventChunkerConf, - dbg_path: PathBuf, - expand: bool, - do_decompress: bool, - ) -> Self { - info!( - "EventChunker::{} do_decompress {}", - "from_event_boundary", do_decompress - ); - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); - ret.state = DataFileState::Event; - ret.need_min = 4; - ret.inp.set_need_min(4); - ret - } - - fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { - span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) - } - - fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { - let mut ret = EventFull::empty(); - let mut parsed_bytes = 0; - use byteorder::{ReadBytesExt, BE}; - loop { - if (buf.len() as u32) < self.need_min { - break; - } - match self.state { - DataFileState::FileHeader => { - if buf.len() < 6 { - Err(Error::with_msg("need min 6 for FileHeader"))?; - } - let mut sl = std::io::Cursor::new(buf.as_ref()); - let fver = sl.read_i16::().unwrap(); - if fver != 0 { - Err(Error::with_msg("unexpected data file version"))?; - } - let len = sl.read_i32::().unwrap(); - if len <= 0 || len >= 128 { - Err(Error::with_msg("large channel header len"))?; - } - let totlen = len as usize + 2; - if buf.len() < totlen { - self.need_min = totlen as u32; - break; - } else { - sl.advance(len as usize - 8); - let len2 = sl.read_i32::().unwrap(); - if len != len2 { - Err(Error::with_msg("channel header len mismatch"))?; - } - String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?; - self.state = DataFileState::Event; - self.need_min = 4; - buf.advance(totlen); - parsed_bytes += totlen as u64; - } - } - DataFileState::Event => { - let p0 = 0; - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len = sl.read_i32::().unwrap(); - if len < 20 || len > 1024 * 1024 * 20 { - Err(Error::with_msg("unexpected large event chunk"))?; - } - let len = len as u32; - if (buf.len() as u32) < len { - self.need_min = len as u32; - break; - } else { - let mut sl = std::io::Cursor::new(buf.as_ref()); - let len1b = sl.read_i32::().unwrap(); - assert!(len == len1b as u32); - let _ttl = sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap() as u64; - let pulse = sl.read_i64::().unwrap() as u64; - if ts == self.max_ts { - if self.repeated_ts_warn_count < 20 { - let msg = format!( - "EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.repeated_ts_warn_count, - ts / SEC, - ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - self.repeated_ts_warn_count += 1; - } - } - if ts < self.max_ts { - if self.unordered_warn_count < 20 { - let msg = format!( - "EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}", - self.unordered_warn_count, - ts / SEC, - ts % SEC, - self.max_ts / SEC, - self.max_ts % SEC, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - self.unordered_warn_count += 1; - let e = Error::with_public_msg_no_trace(msg); - return Err(e); - } - } - self.max_ts = ts; - if ts >= self.range.end { - self.seen_after_range_count += 1; - if !self.expand || self.seen_after_range_count >= 2 { - self.seen_beyond_range = true; - self.data_emit_complete = true; - break; - } - } - if ts < self.range.beg { - self.seen_before_range_count += 1; - if self.seen_before_range_count > 1 { - let msg = format!( - "seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}", - ts / SEC, - ts % SEC, - self.range.beg / SEC, - self.range.beg % SEC, - self.range.end / SEC, - self.range.end % SEC, - pulse, - self.channel_config.shape, - self.dbg_path - ); - warn!("{}", msg); - let e = Error::with_public_msg(msg); - Err(e)?; - } - } - let _ioc_ts = sl.read_i64::().unwrap(); - let status = sl.read_i8().unwrap(); - let severity = sl.read_i8().unwrap(); - let optional = sl.read_i32::().unwrap(); - if status != 0 { - Err(Error::with_msg(format!("status != 0: {}", status)))?; - } - if severity != 0 { - Err(Error::with_msg(format!("severity != 0: {}", severity)))?; - } - if optional != -1 { - Err(Error::with_msg(format!("optional != -1: {}", optional)))?; - } - let type_flags = sl.read_u8().unwrap(); - let type_index = sl.read_u8().unwrap(); - if type_index > 13 { - Err(Error::with_msg(format!("type_index: {}", type_index)))?; - } - let scalar_type = ScalarType::from_dtype_index(type_index)?; - use super::dtflags::*; - let is_compressed = type_flags & COMPRESSION != 0; - let is_array = type_flags & ARRAY != 0; - let is_big_endian = type_flags & BIG_ENDIAN != 0; - let is_shaped = type_flags & SHAPE != 0; - if let Shape::Wave(_) = self.channel_config.shape { - if !is_array { - Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; - } - } - let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; - let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; - assert!(compression_method <= 0); - assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); - let mut shape_lens = [0, 0, 0, 0]; - for i1 in 0..shape_dim { - shape_lens[i1 as usize] = sl.read_u32::().unwrap(); - } - let shape_this = { - if is_shaped { - if shape_dim == 1 { - Shape::Wave(shape_lens[0]) - } else if shape_dim == 2 { - Shape::Image(shape_lens[0], shape_lens[1]) - } else { - err::todoval() - } - } else { - Shape::Scalar - } - }; - let comp_this = if is_compressed { - if compression_method == 0 { - Some(CompressionMethod::BitshuffleLZ4) - } else { - err::todoval() - } - } else { - None - }; - let p1 = sl.position(); - let k1 = len as u64 - (p1 - p0) - 4; - if is_compressed { - //debug!("event ts {} is_compressed {}", ts, is_compressed); - let value_bytes = sl.read_u64::().unwrap(); - let block_size = sl.read_u32::().unwrap(); - //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - match self.channel_config.shape { - Shape::Scalar => { - assert!(value_bytes < 1024 * 1); - } - Shape::Wave(_) => { - assert!(value_bytes < 1024 * 64); - } - Shape::Image(_, _) => { - assert!(value_bytes < 1024 * 1024 * 20); - } - } - assert!(block_size <= 1024 * 32); - let type_size = scalar_type.bytes() as u32; - let ele_count = value_bytes / type_size as u64; - let ele_size = type_size; - match self.channel_config.shape { - Shape::Scalar => { - if is_array { - Err(Error::with_msg(format!( - "ChannelConfig expects Scalar but we find event is_array" - )))?; - } - } - Shape::Wave(dim1count) => { - if dim1count != ele_count as u32 { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config.shape, ele_count, - )))?; - } - } - Shape::Image(n1, n2) => { - let nt = n1 as usize * n2 as usize; - if nt != ele_count as usize { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config.shape, ele_count, - )))?; - } - } - } - let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)]; - let decomp = { - if self.do_decompress { - assert!(data.len() > 12); - let ts1 = Instant::now(); - let decomp_bytes = (type_size * ele_count as u32) as usize; - let mut decomp = vec![0; decomp_bytes]; - // TODO limit the buf slice range - match bitshuffle_decompress( - &data[12..], - &mut decomp, - ele_count as usize, - ele_size as usize, - 0, - ) { - Ok(c1) => { - assert!(c1 as u64 + 12 == k1); - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - // TODO analyze the histo - self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); - Some(decomp) - } - Err(e) => { - return Err(Error::with_msg(format!("decompression failed {:?}", e)))?; - } - } - } else { - None - } - }; - ret.add_event( - ts, - pulse, - Some(data.to_vec()), - decomp, - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - shape_this, - comp_this, - ); - } else { - if len < p1 as u32 + 4 { - let msg = format!("uncomp len: {} p1: {}", len, p1); - Err(Error::with_msg(msg))?; - } - let vlen = len - p1 as u32 - 4; - let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize]; - ret.add_event( - ts, - pulse, - Some(data.to_vec()), - Some(data.to_vec()), - ScalarType::from_dtype_index(type_index)?, - is_big_endian, - shape_this, - comp_this, - ); - } - buf.advance(len as usize); - parsed_bytes += len as u64; - self.need_min = 4; - } - } - } - } - Ok(ParseResult { - events: ret, - parsed_bytes, - }) - } -} - -impl Stream for EventChunker { - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("EventChunker poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { - let item = EventDataReadStats { - parsed_bytes: self.parsed_bytes, - }; - self.parsed_bytes = 0; - let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); - Ready(Some(Ok(ret))) - } else if self.sent_beyond_range { - self.completed = true; - Ready(None) - } else if self.final_stats_sent { - self.sent_beyond_range = true; - trace!("sent_beyond_range"); - if self.seen_beyond_range { - trace!("sent_beyond_range RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - trace!("sent_beyond_range non-complete"); - continue 'outer; - } - } else if self.data_emit_complete { - let item = EventDataReadStats { - parsed_bytes: self.parsed_bytes, - }; - self.parsed_bytes = 0; - let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); - self.final_stats_sent = true; - Ready(Some(Ok(ret))) - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(mut fcr))) => { - if false { - // TODO collect for stats: - info!( - "file read bytes {} ms {}", - fcr.buf().len(), - fcr.duration().as_millis() - ); - } - let r = self.parse_buf(fcr.buf_mut()); - match r { - Ok(res) => { - self.parsed_bytes += res.parsed_bytes; - if fcr.buf().len() > 0 { - // TODO gather stats about this: - self.inp.put_back(fcr); - } - match self.channel_config.shape { - Shape::Scalar => { - if self.need_min > 1024 * 8 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Wave(_) => { - if self.need_min > 1024 * 32 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - Shape::Image(_, _) => { - if self.need_min > 1024 * 1024 * 20 { - let msg = - format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - return Ready(Some(Err(Error::with_msg(msg)))); - } - } - } - let x = self.need_min; - self.inp.set_need_min(x); - if false { - info!( - "EventChunker emits {} events tss {:?}", - res.events.len(), - res.events.tss - ); - }; - self.item_len_emit_histo.ingest(res.events.len() as u32); - let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); - Ready(Some(Ok(ret))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - } - } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.data_emit_complete = true; - continue 'outer; - } - Pending => Pending, - } - }; - } - } -} - -#[cfg(test)] -mod test { - //use err::Error; - //use netpod::timeunits::*; - //use netpod::{ByteSize, Nanos}; - - /* - #[test] - fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { - let chn = netpod::Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }; - // TODO read config from disk. - let channel_config = ChannelConfig { - channel: chn, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), - shape: netpod::Shape::Scalar, - array: false, - compression: false, - }; - let cluster = taskrun::test_cluster(); - let node = cluster.nodes[nodeix].clone(); - let buffer_size = 512; - let event_chunker_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - } - */ -} diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index c50c926..b9191b2 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -56,9 +56,9 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: use items_0::streamitem::Sitemty; use std::pin::Pin; let stream: Pin> + Send>> = if evq.transform().is_pulse_id_diff() { - Box::pin(stream.map(|item| { - let mut pulse_last = None; - on_sitemty_data!(item, move |item| { + let mut pulse_last = None; + Box::pin(stream.map(move |item| { + on_sitemty_data!(item, |item| { use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; use items_0::Appendable;