From 91947dec0f2634956646fcf151685b3f002930ca Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 Jul 2023 11:27:39 +0200 Subject: [PATCH] rustc panics --- crates/bitshuffle/src/bitshuffle.c | 2 +- crates/dbconn/src/dbconn.rs | 49 +++- crates/dbconn/src/scan.rs | 3 +- crates/disk/Cargo.toml | 2 +- crates/disk/src/aggtest.rs | 4 +- crates/disk/src/channelconfig.rs | 11 +- crates/disk/src/dataopen.rs | 1 + crates/disk/src/decode.rs | 17 +- crates/disk/src/disk.rs | 18 +- crates/disk/src/eventblobs.rs | 14 +- crates/disk/src/eventchunker.rs | 126 +--------- crates/disk/src/paths.rs | 39 +-- crates/disk/src/raw/conn.rs | 28 +-- crates/disk/src/raw/generated.rs | 6 +- crates/dq/src/bin/dq.rs | 8 +- crates/err/src/lib.rs | 17 +- crates/httpclient/src/httpclient.rs | 2 +- crates/httpret/src/api1.rs | 148 ++++++----- crates/httpret/src/api4/binned.rs | 7 +- crates/httpret/src/api4/databuffer_tools.rs | 244 +++++++++++++++++-- crates/httpret/src/api4/events.rs | 14 +- crates/httpret/src/api4/search.rs | 2 +- crates/httpret/src/api4/status.rs | 10 +- crates/httpret/src/bodystream.rs | 94 +------ crates/httpret/src/channel_status.rs | 2 +- crates/httpret/src/channelconfig.rs | 10 +- crates/httpret/src/download.rs | 28 ++- crates/httpret/src/gather.rs | 49 ++-- crates/httpret/src/httpret.rs | 123 +++++++--- crates/httpret/src/prometheus.rs | 43 ++-- crates/httpret/src/proxy.rs | 2 +- crates/httpret/src/proxy/api1.rs | 9 +- crates/httpret/src/proxy/api1/reqstatus.rs | 2 +- crates/httpret/src/proxy/api4.rs | 2 +- crates/httpret/src/proxy/api4/caioclookup.rs | 2 +- crates/httpret/src/pulsemap.rs | 2 +- crates/httpret/src/settings.rs | 12 +- crates/items_0/src/streamitem.rs | 2 + crates/items_2/Cargo.toml | 1 + crates/items_2/src/eventfull.rs | 109 +++++++-- crates/netpod/src/netpod.rs | 37 ++- crates/nodenet/src/configquorum.rs | 13 + crates/nodenet/src/conn.rs | 237 +++++++++--------- crates/nodenet/src/conn/test.rs | 2 +- crates/parse/src/channelconfig.rs | 63 ++--- crates/query/src/api4/events.rs | 11 +- crates/streams/src/plaineventsjson.rs | 3 +- crates/streams/src/slidebuf.rs | 22 +- crates/streams/src/timebinnedjson.rs | 9 +- 49 files changed, 982 insertions(+), 679 deletions(-) diff --git a/crates/bitshuffle/src/bitshuffle.c b/crates/bitshuffle/src/bitshuffle.c index 30037be..7866f50 100644 --- a/crates/bitshuffle/src/bitshuffle.c +++ b/crates/bitshuffle/src/bitshuffle.c @@ -20,7 +20,7 @@ // Constants. // Use fast decompression instead of safe decompression for LZ4. -#define BSHUF_LZ4_DECOMPRESS_FAST +// #define BSHUF_LZ4_DECOMPRESS_FAST // Macros. diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 6415a5c..971aac6 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -8,13 +8,16 @@ pub mod pg { } use err::anyhow; +use err::thiserror; use err::Error; use err::Res2; +use err::ThisError; use netpod::log::*; use netpod::TableSizes; use netpod::{Database, NodeConfigCached, SfDbChannel}; use netpod::{ScalarType, Shape}; use pg::{Client as PgClient, NoTls}; +use serde::Serialize; use std::sync::Arc; use std::time::Duration; @@ -195,7 +198,7 @@ pub async fn find_series(channel: &SfDbChannel, pgclient: Arc) -> Resu // Currently only for sf-databuffer type backends // Note: we currently treat the channels primary key as series-id for sf-databuffer type backends. pub async fn find_series_sf_databuffer(channel: &SfDbChannel, pgclient: Arc) -> Res2 { - info!("find_series channel {:?}", channel); + debug!("find_series_sf_databuffer {:?}", channel); let sql = "select rowid from facilities where name = $1"; let rows = pgclient.query(sql, &[&channel.backend()]).await.err_conv()?; let row = rows @@ -218,3 +221,47 @@ pub async fn find_series_sf_databuffer(channel: &SfDbChannel, pgclient: Arc(0) as u64; Ok(series) } + +#[derive(Debug, ThisError, Serialize)] +pub enum FindChannelError { + UnknownBackend, + BadSeriesId, + NoFound, + MultipleFound, + Database(String), +} + +// On sf-databuffer, the channel name identifies the series. But we can also have a series id. +// This function is used if the request provides only the series-id, but no name. +pub async fn find_sf_channel_by_series( + channel: SfDbChannel, + pgclient: Arc, +) -> Result { + debug!("find_sf_channel_by_series {:?}", channel); + let series = channel.series().ok_or_else(|| FindChannelError::BadSeriesId)?; + let sql = "select rowid from facilities where name = $1"; + let rows = pgclient + .query(sql, &[&channel.backend()]) + .await + .map_err(|e| FindChannelError::Database(e.to_string()))?; + let row = rows + .into_iter() + .next() + .ok_or_else(|| FindChannelError::UnknownBackend)?; + let backend_id: i64 = row.get(0); + let sql = "select name from channels where facility = $1 and rowid = $2"; + let rows = pgclient + .query(sql, &[&backend_id, &(series as i64)]) + .await + .map_err(|e| FindChannelError::Database(e.to_string()))?; + if rows.len() > 1 { + return Err(FindChannelError::MultipleFound); + } + if let Some(row) = rows.into_iter().next() { + let name = row.get::<_, String>(0); + let channel = SfDbChannel::from_full(channel.backend(), channel.series(), name); + Ok(channel) + } else { + return Err(FindChannelError::NoFound); + } +} diff --git a/crates/dbconn/src/scan.rs b/crates/dbconn/src/scan.rs index 69e88ca..0f6ac77 100644 --- a/crates/dbconn/src/scan.rs +++ b/crates/dbconn/src/scan.rs @@ -12,7 +12,6 @@ use futures_util::Stream; use netpod::log::*; use netpod::Database; use netpod::NodeConfigCached; -use parse::channelconfig::NErr; use pin_project::pin_project; use serde::Deserialize; use serde::Serialize; @@ -513,7 +512,7 @@ async fn update_db_with_channel_config( }; if do_parse { let buf = tokio::fs::read(&path).await?; - let config = parse::channelconfig::parse_config(&buf).map_err(NErr::from)?.1; + let config = parse::channelconfig::parse_config(&buf).map_err(|e| Error::from(e.to_string()))?; match config_id { None => { dbc.query( diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index 68b55c5..5cbbef2 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -37,10 +37,10 @@ err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } query = { path = "../query" } -bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } httpclient = { path = "../httpclient" } +bitshuffle = { path = "../bitshuffle" } diff --git a/crates/disk/src/aggtest.rs b/crates/disk/src/aggtest.rs index 7a46206..75da0e7 100644 --- a/crates/disk/src/aggtest.rs +++ b/crates/disk/src/aggtest.rs @@ -86,10 +86,10 @@ async fn agg_x_dim_0_inner() { 0, disk_io_tune, event_chunker_conf, - false, true, // TODO 32, + netpod::ReqCtx::new("req-000"), ); let _ = fut1; // TODO add the binning and expectation and await the result. @@ -148,10 +148,10 @@ async fn agg_x_dim_1_inner() { 0, disk_io_tune, event_chunker_conf, - false, true, // TODO 32, + netpod::ReqCtx::new("req-000"), ); let _ = fut1; // TODO add the binning and expectation and await the result. diff --git a/crates/disk/src/channelconfig.rs b/crates/disk/src/channelconfig.rs index 3220b21..934fcb1 100644 --- a/crates/disk/src/channelconfig.rs +++ b/crates/disk/src/channelconfig.rs @@ -1,5 +1,5 @@ use crate::SfDbChConf; -use err::thiserror; +use err::*; #[allow(unused)] use netpod::log::*; use netpod::range::evrange::NanoRange; @@ -10,21 +10,14 @@ use parse::channelconfig::read_local_config; use parse::channelconfig::ChannelConfigs; use parse::channelconfig::ConfigEntry; use parse::channelconfig::ConfigParseError; -use std::fmt; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, ThisError)] pub enum ConfigError { ParseError(ConfigParseError), NotFound, Error, } -// impl fmt::Display for ConfigError { -// fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { -// write!(fmt, "ConfigError::{self:?}") -// } -// } - impl From for ConfigError { fn from(value: ConfigParseError) -> Self { match value { diff --git a/crates/disk/src/dataopen.rs b/crates/disk/src/dataopen.rs index 778762c..567a51e 100644 --- a/crates/disk/src/dataopen.rs +++ b/crates/disk/src/dataopen.rs @@ -19,6 +19,7 @@ use tokio::io::AsyncSeekExt; use tokio::io::ErrorKind; use tokio::io::SeekFrom; +#[cfg(test)] const BACKEND: &str = "testbackend-00"; pub struct Positioned { diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index e405641..bb23505 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -248,7 +248,10 @@ fn make_scalar_conv( agg_kind: &AggKind, ) -> Result, Error> { let ret = match agg_kind { - AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"), + AggKind::EventBlobs => { + error!("make_scalar_conv EventBlobs"); + return Err(Error::with_msg_no_trace("make_scalar_conv EventBlobs")); + } AggKind::Plain | AggKind::DimXBinsN(_) | AggKind::DimXBins1 @@ -285,7 +288,10 @@ fn make_scalar_conv( ScalarType::STRING => ValueDim1FromBytesImpl::::boxed(shape), } } - Shape::Image(_, _) => todo!("make_scalar_conv Image"), + Shape::Image(_, _) => { + error!("make_scalar_conv Image"); + return Err(Error::with_msg_no_trace("make_scalar_conv Image")); + } }, }; Ok(ret) @@ -343,7 +349,7 @@ impl EventsDynStream { fn replace_events_out(&mut self) -> Result, Error> { let st = &self.scalar_type; let sh = &self.shape; - error!("TODO replace_events_out feed through transform"); + // error!("TODO replace_events_out feed through transform"); // TODO do we need/want the empty item from here? let empty = items_2::empty::empty_events_dyn_ev(st, sh)?; let evs = mem::replace(&mut self.events_out, empty); @@ -362,11 +368,6 @@ impl EventsDynStream { .zip(item.pulses.iter()) { let endian = if be { Endian::Big } else { Endian::Little }; - let buf = if let Some(x) = buf { - x - } else { - return Err(Error::with_msg_no_trace("no buf in event")); - }; self.scalar_conv .convert(ts, pulse, buf, endian, self.events_out.as_mut())?; } diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index cb002a1..17edc0e 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -283,6 +283,7 @@ fn start_read5( file: File, tx: async_channel::Sender>, disk_io_tune: DiskIoTune, + reqid: String, ) -> Result<(), Error> { let fut = async move { let mut file = file; @@ -300,7 +301,7 @@ fn start_read5( } }; let mut pos = pos_beg; - info!("read5 begin {disk_io_tune:?}"); + debug!("read5 begin {disk_io_tune:?}"); loop { let mut buf = BytesMut::new(); buf.resize(disk_io_tune.read_buffer_len, 0); @@ -357,9 +358,9 @@ pub struct FileContentStream5 { } impl FileContentStream5 { - pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune) -> Result { + pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune, reqid: String) -> Result { let (tx, rx) = async_channel::bounded(32); - start_read5(path, file, tx, disk_io_tune)?; + start_read5(path, file, tx, disk_io_tune, reqid)?; let ret = Self { rx }; Ok(ret) } @@ -747,11 +748,16 @@ impl Stream for FileContentStream4 { } } -pub fn file_content_stream( +pub fn file_content_stream( path: PathBuf, file: File, disk_io_tune: DiskIoTune, -) -> Pin> + Send>> { + reqid: S, +) -> Pin> + Send>> +where + S: Into, +{ + let reqid = reqid.into(); debug!("file_content_stream disk_io_tune {disk_io_tune:?}"); match &disk_io_tune.read_sys { ReadSys::TokioAsyncRead => { @@ -771,7 +777,7 @@ pub fn file_content_stream( Box::pin(s) as _ } ReadSys::Read5 => { - let s = FileContentStream5::new(path, file, disk_io_tune).unwrap(); + let s = FileContentStream5::new(path, file, disk_io_tune, reqid).unwrap(); Box::pin(s) as _ } } diff --git a/crates/disk/src/eventblobs.rs b/crates/disk/src/eventblobs.rs index bf7ab87..dd621c4 100644 --- a/crates/disk/src/eventblobs.rs +++ b/crates/disk/src/eventblobs.rs @@ -18,6 +18,7 @@ use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::DiskIoTune; use netpod::Node; +use netpod::ReqCtxArc; use netpod::SfChFetchInfo; use std::collections::VecDeque; use std::pin::Pin; @@ -39,7 +40,6 @@ pub struct EventChunkerMultifile { files_count: u32, node_ix: usize, expand: bool, - do_decompress: bool, max_ts: u64, out_max_len: usize, emit_count: usize, @@ -49,6 +49,7 @@ pub struct EventChunkerMultifile { done: bool, done_emit_range_final: bool, complete: bool, + reqctx: ReqCtxArc, } impl EventChunkerMultifile { @@ -64,10 +65,10 @@ impl EventChunkerMultifile { disk_io_tune: DiskIoTune, event_chunker_conf: EventChunkerConf, expand: bool, - do_decompress: bool, out_max_len: usize, + reqctx: ReqCtxArc, ) -> Self { - info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}"); + debug!("EventChunkerMultifile expand {expand}"); let file_chan = if expand { open_expanded_files(&range, &fetch_info, node) } else { @@ -83,7 +84,6 @@ impl EventChunkerMultifile { files_count: 0, node_ix, expand, - do_decompress, max_ts: 0, out_max_len, emit_count: 0, @@ -93,6 +93,7 @@ impl EventChunkerMultifile { done: false, done_emit_range_final: false, complete: false, + reqctx, } } } @@ -103,7 +104,6 @@ impl Stream for EventChunkerMultifile { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix); let _spg = span1.enter(); - info!("EventChunkerMultifile poll_next"); use Poll::*; 'outer: loop { break if let Some(item) = self.log_queue.pop_front() { @@ -194,6 +194,7 @@ impl Stream for EventChunkerMultifile { path.clone(), file, self.disk_io_tune.clone(), + self.reqctx.reqid(), )); let chunker = EventChunker::from_event_boundary( inp, @@ -202,7 +203,6 @@ impl Stream for EventChunkerMultifile { self.event_chunker_conf.clone(), path.clone(), self.expand, - self.do_decompress, ); let filtered = RangeFilter2::new(chunker, self.range.clone(), self.expand); self.evs = Some(Box::pin(filtered)); @@ -229,6 +229,7 @@ impl Stream for EventChunkerMultifile { of.path.clone(), file, self.disk_io_tune.clone(), + self.reqctx.reqid(), ); let chunker = EventChunker::from_event_boundary( inp, @@ -237,7 +238,6 @@ impl Stream for EventChunkerMultifile { self.event_chunker_conf.clone(), of.path.clone(), self.expand, - self.do_decompress, ); chunkers.push(Box::pin(chunker) as _); } diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index fcbaae7..e2530e2 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -1,8 +1,8 @@ -use bitshuffle::bitshuffle_decompress; use bytes::Buf; use bytes::BytesMut; use err::thiserror; use err::Error; +use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; @@ -21,23 +21,21 @@ use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::Shape; use parse::channelconfig::CompressionMethod; +use serde::Deserialize; +use serde::Serialize; use std::io::Cursor; use std::path::PathBuf; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use std::time::Instant; use streams::dtflags::*; use streams::filechunkread::FileChunkRead; use streams::needminbuffer::NeedMinBuffer; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, ThisError, Serialize, Deserialize)] pub enum DataParseError { - #[error("DataFrameLengthMismatch")] DataFrameLengthMismatch, - #[error("FileHeaderTooShort")] FileHeaderTooShort, - #[error("BadVersionTag")] BadVersionTag, #[error("HeaderTooLarge")] HeaderTooLarge, @@ -59,9 +57,7 @@ pub enum DataParseError { ShapedWithoutDims, #[error("TooManyDims")] TooManyDims, - #[error("UnknownCompression")] UnknownCompression, - #[error("BadCompresionBlockSize")] BadCompresionBlockSize, } @@ -83,7 +79,6 @@ pub struct EventChunker { dbg_path: PathBuf, last_ts: u64, expand: bool, - do_decompress: bool, decomp_dt_histo: HistoLog2, item_len_emit_histo: HistoLog2, seen_before_range_count: usize, @@ -155,36 +150,6 @@ fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo) } } -#[derive(Debug, thiserror::Error)] -pub enum DecompError { - #[error("Error")] - Error, -} - -fn decompress(databuf: &[u8], type_size: u32, ele_count: u64) -> Result, DecompError> { - if databuf.len() < 13 { - return Err(DecompError::Error); - } - let ts1 = Instant::now(); - let decomp_bytes = type_size as u64 * ele_count; - let mut decomp = vec![0; decomp_bytes as usize]; - let ele_size = type_size; - // TODO limit the buf slice range - match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as usize, ele_size as usize, 0) { - Ok(c1) => { - if 12 + c1 != databuf.len() {} - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - // TODO analyze the histo - //self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); - Ok(decomp) - } - Err(e) => { - return Err(DecompError::Error); - } - } -} - impl EventChunker { pub fn self_name() -> &'static str { std::any::type_name::() @@ -198,14 +163,8 @@ impl EventChunker { stats_conf: EventChunkerConf, dbg_path: PathBuf, expand: bool, - do_decompress: bool, ) -> Self { - info!( - "{}::{} do_decompress {}", - Self::self_name(), - "from_start", - do_decompress - ); + info!("{}::{}", Self::self_name(), "from_start"); let need_min_max = match fetch_info.shape() { Shape::Scalar => 1024 * 8, Shape::Wave(_) => 1024 * 32, @@ -231,7 +190,6 @@ impl EventChunker { dbg_path, last_ts: 0, expand, - do_decompress, decomp_dt_histo: HistoLog2::new(8), item_len_emit_histo: HistoLog2::new(0), seen_before_range_count: 0, @@ -251,15 +209,9 @@ impl EventChunker { stats_conf: EventChunkerConf, dbg_path: PathBuf, expand: bool, - do_decompress: bool, ) -> Self { - info!( - "{}::{} do_decompress {}", - Self::self_name(), - "from_event_boundary", - do_decompress - ); - let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress); + info!("{}::{}", Self::self_name(), "from_event_boundary"); + let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -275,7 +227,7 @@ impl EventChunker { fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { use byteorder::ReadBytesExt; use byteorder::BE; - info!("parse_buf_inner buf len {}", buf.len()); + trace!("parse_buf_inner buf len {}", buf.len()); let mut ret = EventFull::empty(); let mut parsed_bytes = 0; loop { @@ -485,37 +437,13 @@ impl EventChunker { let n1 = p1 - p0; let n2 = len as u64 - n1 - 4; let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref(); - if false && is_compressed { - //debug!("event ts {} is_compressed {}", ts, is_compressed); - let value_bytes = sl.read_u64::().unwrap(); - let block_size = sl.read_u32::().unwrap(); - //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - match self.fetch_info.shape() { - Shape::Scalar => { - assert!(value_bytes < 1024 * 1); - } - Shape::Wave(_) => { - assert!(value_bytes < 1024 * 64); - } - Shape::Image(_, _) => { - assert!(value_bytes < 1024 * 1024 * 20); - } - } - if block_size > 1024 * 32 { - return Err(DataParseError::BadCompresionBlockSize); - } - let type_size = scalar_type.bytes() as u32; - let _ele_count = value_bytes / type_size as u64; - let _ele_size = type_size; - } if discard { self.discard_count += 1; } else { ret.add_event( ts, pulse, - Some(databuf.to_vec()), - None, + databuf.to_vec(), scalar_type, is_big_endian, shape_this, @@ -635,39 +563,3 @@ impl Stream for EventChunker { } } } - -#[cfg(test)] -mod test { - //use err::Error; - //use netpod::timeunits::*; - //use netpod::{ByteSize, Nanos}; - - //const TEST_BACKEND: &str = "testbackend-00"; - - /* - #[test] - fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { - let chn = netpod::Channel { - backend: TEST_BACKEND.into(), - name: "scalar-i32-be".into(), - }; - // TODO read config from disk. - let channel_config = ChannelConfig { - channel: chn, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), - shape: netpod::Shape::Scalar, - array: false, - compression: false, - }; - let cluster = taskrun::test_cluster(); - let node = cluster.nodes[nodeix].clone(); - let buffer_size = 512; - let event_chunker_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - } - */ -} diff --git a/crates/disk/src/paths.rs b/crates/disk/src/paths.rs index 8c18901..027a0db 100644 --- a/crates/disk/src/paths.rs +++ b/crates/disk/src/paths.rs @@ -7,18 +7,18 @@ use netpod::SfChFetchInfo; use netpod::TsNano; use std::path::PathBuf; -// TODO remove/replace this -pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf { +pub fn datapath_for_keyspace(ks: u32, node: &Node) -> PathBuf { node.sf_databuffer .as_ref() .unwrap() .data_base_path - .join(format!( - "{}_{}", - node.sf_databuffer.as_ref().unwrap().ksprefix, - config.keyspace - )) + .join(format!("{}_{}", node.sf_databuffer.as_ref().unwrap().ksprefix, ks)) .join("byTime") +} + +// TODO remove/replace this +pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf { + datapath_for_keyspace(config.keyspace as u32, node) .join(config.channel.name()) .join(format!("{:019}", timebin)) .join(format!("{:010}", split)) @@ -37,10 +37,7 @@ pub async fn datapaths_for_timebin( node: &Node, ) -> Result, Error> { let sfc = node.sf_databuffer.as_ref().unwrap(); - let timebin_path = sfc - .data_base_path - .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) - .join("byTime") + let timebin_path = datapath_for_keyspace(fetch_info.ks() as u32, node) .join(fetch_info.name()) .join(format!("{:019}", timebin)); let rd = tokio::fs::read_dir(timebin_path).await?; @@ -70,12 +67,9 @@ pub async fn datapaths_for_timebin( } } } - let mut ret = vec![]; + let mut ret = Vec::new(); for split in splits { - let path = sfc - .data_base_path - .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) - .join("byTime") + let path = datapath_for_keyspace(fetch_info.ks() as u32, node) .join(fetch_info.name()) .join(format!("{:019}", timebin)) .join(format!("{:010}", split)) @@ -86,12 +80,7 @@ pub async fn datapaths_for_timebin( } pub fn channel_timebins_dir_path(fetch_info: &SfChFetchInfo, node: &Node) -> Result { - let sfc = node.sf_databuffer.as_ref().unwrap(); - let ret = sfc - .data_base_path - .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) - .join("byTime") - .join(fetch_info.name()); + let ret = datapath_for_keyspace(fetch_info.ks() as u32, node).join(fetch_info.name()); Ok(ret) } @@ -115,11 +104,7 @@ pub fn index_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Nod } pub fn data_dir_path_tb(ks: u32, channel_name: &str, tb: u32, split: u32, node: &Node) -> Result { - let sfc = node.sf_databuffer.as_ref().unwrap(); - let ret = sfc - .data_base_path - .join(format!("{}_{}", sfc.ksprefix, ks)) - .join("byTime") + let ret = datapath_for_keyspace(ks, node) .join(channel_name) .join(format!("{:019}", tb)) .join(format!("{:010}", split)); diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 352b7af..5739054 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -17,6 +17,7 @@ use netpod::AggKind; use netpod::ByteSize; use netpod::DiskIoTune; use netpod::NodeConfigCached; +use netpod::ReqCtxArc; use netpod::SfChFetchInfo; use query::api4::events::EventsSubQuery; use std::pin::Pin; @@ -55,6 +56,7 @@ fn make_num_pipeline_stream_evs( pub async fn make_event_pipe( evq: EventsSubQuery, fetch_info: SfChFetchInfo, + reqctx: ReqCtxArc, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { // sf-databuffer type backends identify channels by their (backend, name) only. @@ -72,7 +74,6 @@ pub async fn make_event_pipe( } else { 128 }; - let do_decompress = true; let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, fetch_info.clone(), @@ -81,8 +82,8 @@ pub async fn make_event_pipe( DiskIoTune::default(), event_chunker_conf, one_before, - do_decompress, out_max_len, + reqctx, ); error!("TODO replace AggKind in the called code"); let pipe = make_num_pipeline_stream_evs(fetch_info, AggKind::TimeWeightedScalar, event_blobs); @@ -93,17 +94,12 @@ pub fn make_local_event_blobs_stream( range: NanoRange, fetch_info: SfChFetchInfo, expand: bool, - do_decompress: bool, event_chunker_conf: EventChunkerConf, disk_io_tune: DiskIoTune, + reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result { - info!( - "make_local_event_blobs_stream {fetch_info:?} do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}" - ); - if do_decompress { - warn!("Possible issue: decompress central storage event blob stream"); - } + info!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}"); // TODO should not need this for correctness. // Should limit based on return size and latency. let out_max_len = if node_config.node_config.cluster.is_central_storage { @@ -119,8 +115,8 @@ pub fn make_local_event_blobs_stream( disk_io_tune, event_chunker_conf, expand, - do_decompress, out_max_len, + reqctx, ); Ok(event_blobs) } @@ -129,9 +125,9 @@ pub fn make_remote_event_blobs_stream( range: NanoRange, fetch_info: SfChFetchInfo, expand: bool, - do_decompress: bool, event_chunker_conf: EventChunkerConf, disk_io_tune: DiskIoTune, + reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result>, Error> { debug!("make_remote_event_blobs_stream"); @@ -150,8 +146,8 @@ pub fn make_remote_event_blobs_stream( disk_io_tune, event_chunker_conf, expand, - do_decompress, out_max_len, + reqctx, ); Ok(event_blobs) } @@ -159,6 +155,7 @@ pub fn make_remote_event_blobs_stream( pub async fn make_event_blobs_pipe_real( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, + reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if false { @@ -177,9 +174,9 @@ pub async fn make_event_blobs_pipe_real( range.try_into()?, fetch_info.clone(), expand, - false, event_chunker_conf, DiskIoTune::default(), + reqctx, node_config, )?; Box::pin(event_blobs) as _ @@ -188,9 +185,9 @@ pub async fn make_event_blobs_pipe_real( range.try_into()?, fetch_info.clone(), expand, - true, event_chunker_conf, DiskIoTune::default(), + reqctx, node_config, )?; /* @@ -251,12 +248,13 @@ pub async fn make_event_blobs_pipe_test( pub async fn make_event_blobs_pipe( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, + reqctx: ReqCtxArc, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { debug!("make_event_blobs_pipe {subq:?}"); if subq.backend() == TEST_BACKEND { make_event_blobs_pipe_test(subq, node_config).await } else { - make_event_blobs_pipe_real(subq, fetch_info, node_config).await + make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config).await } } diff --git a/crates/disk/src/raw/generated.rs b/crates/disk/src/raw/generated.rs index 81e9ef5..0bee401 100644 --- a/crates/disk/src/raw/generated.rs +++ b/crates/disk/src/raw/generated.rs @@ -77,8 +77,7 @@ impl EventBlobsGeneratorI32Test00 { item.add_event( ts, pulse, - Some(value.to_be_bytes().to_vec()), - None, + value.to_be_bytes().to_vec(), self.scalar_type.clone(), self.be, self.shape.clone(), @@ -178,8 +177,7 @@ impl EventBlobsGeneratorI32Test01 { item.add_event( ts, pulse, - Some(value.to_be_bytes().to_vec()), - None, + value.to_be_bytes().to_vec(), self.scalar_type.clone(), self.be, self.shape.clone(), diff --git a/crates/dq/src/bin/dq.rs b/crates/dq/src/bin/dq.rs index 1879740..415bfb3 100644 --- a/crates/dq/src/bin/dq.rs +++ b/crates/dq/src/bin/dq.rs @@ -56,7 +56,7 @@ pub fn main() -> Result<(), Error> { file.read_exact(&mut buf).await?; drop(file); let config = match parse::channelconfig::parse_config(&buf) { - Ok(k) => k.1, + Ok(k) => k, Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), }; eprintln!("Read config: {:?}", config); @@ -69,14 +69,14 @@ pub fn main() -> Result<(), Error> { file.read_exact(&mut buf).await?; drop(file); let config = match parse::channelconfig::parse_config(&buf) { - Ok(k) => k.1, + Ok(k) => k, Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), }; eprintln!("Read config: {:?}", config); let path = sub.datafile; let file = File::open(&path).await?; let disk_io_tune = netpod::DiskIoTune::default(); - let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune)); + let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune, "req-000")); let ce = &config.entries[0]; let fetch_info = SfChFetchInfo::new( "", @@ -94,7 +94,7 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::from_mb(2), }; - let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, path.clone(), false, true); + let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, path.clone(), true); err::todo(); Ok(()) } diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index 7781499..0f70338 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -3,6 +3,8 @@ pub use anyhow; pub use thiserror; pub use thiserror::Error as ThisError; +pub use thiserror::UserErrorClass; +pub use thiserror::UserErrorContent; pub mod bt { pub use backtrace::Backtrace; @@ -476,7 +478,7 @@ pub fn todoval() -> T { todo!("TODO todoval\n{bt:?}") } -pub trait ToPublicError: std::error::Error { +pub trait ToPublicError: std::error::Error + Send { fn to_public_error(&self) -> String; } @@ -546,6 +548,19 @@ mod test { assert_eq!(s, "SomeErrorEnumB0::FromA(SomeErrorEnumA::BadCase)"); } + #[test] + fn error_handle_b0_user_00() { + use thiserror::UserErrorClass; + use thiserror::UserErrorInfo; + let e = failing_b0_00().unwrap_err(); + let s = e.class(); + if let UserErrorClass::Unspecified = s { + () + } else { + panic!() + } + } + #[test] fn error_handle_b1_00() { let e = failing_b1_00().unwrap_err(); diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 9695e5c..6279e80 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -55,7 +55,7 @@ pub async fn http_get(url: Url, accept: &str) -> Result { let client = hyper::Client::new(); let res = client.request(req).await.ec()?; let (head, body) = res.into_parts(); - info!("http_get head {head:?}"); + debug!("http_get head {head:?}"); let body = hyper::body::to_bytes(body).await.ec()?; let ret = HttpResponse { head, body }; Ok(ret) diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index a063823..17a87c4 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -1,14 +1,13 @@ -use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::response; -use crate::BodyStream; use crate::ReqCtx; use bytes::BufMut; use bytes::BytesMut; use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use disk::raw::conn::make_local_event_blobs_stream; +use err::Error; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -36,12 +35,14 @@ use netpod::ChannelTypeConfigGen; use netpod::DiskIoTune; use netpod::NodeConfigCached; use netpod::ProxyConfig; +use netpod::ReqCtxArc; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use netpod::APP_OCTET; +use netpod::X_DAQBUF_REQID; use parse::api1_parse::Api1ByteOrder; use parse::api1_parse::Api1ChannelHeader; use query::api4::events::EventsSubQuery; @@ -529,15 +530,16 @@ pub struct DataApiPython3DataStream { channels: VecDeque, settings: EventsSubQuerySettings, current_channel: Option, + current_fetch_info: Option, node_config: NodeConfigCached, - chan_stream: Option> + Send>>>, + chan_stream: Option> + Send>>>, config_fut: Option, Error>> + Send>>>, disk_io_tune: DiskIoTune, do_decompress: bool, - #[allow(unused)] - event_count: u64, + event_count: usize, events_max: u64, - status_id: String, + header_out: bool, + reqctx: ReqCtxArc, ping_last: Instant, data_done: bool, completed: bool, @@ -551,7 +553,7 @@ impl DataApiPython3DataStream { disk_io_tune: DiskIoTune, do_decompress: bool, events_max: u64, - status_id: String, + reqctx: ReqCtxArc, node_config: NodeConfigCached, ) -> Self { Self { @@ -559,6 +561,7 @@ impl DataApiPython3DataStream { channels: channels.into_iter().collect(), settings, current_channel: None, + current_fetch_info: None, node_config, chan_stream: None, config_fut: None, @@ -566,7 +569,8 @@ impl DataApiPython3DataStream { do_decompress, event_count: 0, events_max, - status_id, + header_out: false, + reqctx, ping_last: Instant::now(), data_done: false, completed: false, @@ -577,6 +581,7 @@ impl DataApiPython3DataStream { b: EventFull, channel: &ChannelTypeConfigGen, fetch_info: &SfChFetchInfo, + do_decompress: bool, header_out: &mut bool, count_events: &mut usize, ) -> Result { @@ -586,10 +591,10 @@ impl DataApiPython3DataStream { const EVIMAX: usize = 6; if *count_events < EVIMAX { debug!( - "ev info {}/{} decomps len {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}", + "ev info {}/{} bloblen {:?} BE {:?} scalar-type {:?} shape {:?} comps {:?}", *count_events + 1, EVIMAX, - b.decomps[i1].as_ref().map(|x| x.len()), + b.blobs[i1].len(), b.be[i1], b.scalar_types[i1], b.shapes[i1], @@ -618,26 +623,38 @@ impl DataApiPython3DataStream { b.comps.get(i1).map(|x| x.clone()).unwrap(), ); let h = serde_json::to_string(&head)?; - info!("sending channel header {}", h); + debug!("sending channel header {}", h); let l1 = 1 + h.as_bytes().len() as u32; d.put_u32(l1); d.put_u8(0); - info!("header frame byte len {}", 4 + 1 + h.as_bytes().len()); + debug!("header frame byte len {}", 4 + 1 + h.as_bytes().len()); d.extend_from_slice(h.as_bytes()); d.put_u32(l1); *header_out = true; } match &b.shapes[i1] { _ => { - let empty_blob = Vec::new(); - let blob = b.blobs[i1].as_ref().unwrap_or(&empty_blob); - let l1 = 17 + blob.len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&blob); - d.put_u32(l1); + if do_decompress { + let blob = b + .data_decompressed(i1, fetch_info.scalar_type(), fetch_info.shape()) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let l1 = 17 + blob.len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&blob); + d.put_u32(l1); + } else { + let blob = b.data_raw(i1); + let l1 = 17 + blob.len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&blob); + d.put_u32(l1); + } } } *count_events += 1; @@ -645,23 +662,42 @@ impl DataApiPython3DataStream { Ok(d) } - fn handle_chan_stream_ready(&mut self, item: Result) -> Option> { + fn handle_chan_stream_ready(&mut self, item: Sitemty) -> Option> { match item { Ok(k) => { let n = Instant::now(); if n.duration_since(self.ping_last) >= Duration::from_millis(2000) { let mut sb = crate::status_board().unwrap(); - sb.mark_alive(&self.status_id); + sb.mark_alive(self.reqctx.reqid()); self.ping_last = n; } - Some(Ok(k)) + match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => todo!(), + RangeCompletableItem::Data(k) => { + let item = Self::convert_item( + k, + self.current_channel.as_ref().unwrap(), + self.current_fetch_info.as_ref().unwrap(), + self.do_decompress, + &mut self.header_out, + &mut self.event_count, + )?; + todo!() + } + }, + StreamItem::Log(k) => todo!(), + StreamItem::Stats(k) => todo!(), + } } Err(e) => { error!("DataApiPython3DataStream emit error: {e:?}"); self.chan_stream = None; + self.current_channel = None; + self.current_fetch_info = None; self.data_done = true; let mut sb = crate::status_board().unwrap(); - sb.add_error(&self.status_id, e); + sb.add_error(self.reqctx.reqid(), e); if false { // TODO format as python data api error frame: let mut buf = BytesMut::with_capacity(1024); @@ -682,14 +718,14 @@ impl DataApiPython3DataStream { self.range.clone().into(), TransformQuery::for_event_blobs(), ); - let subq = EventsSubQuery::from_parts(select, self.settings.clone()); + let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.reqctx.reqid().into()); let one_before = subq.transform().need_one_before_range(); debug!("query for event blobs retrieval subq {subq:?}"); // TODO important TODO debug!("TODO fix magic inmem_bufcap"); debug!("TODO add timeout option to data api3 download"); // TODO is this a good to place decide this? - let s = if self.node_config.node_config.cluster.is_central_storage { + let stream = if self.node_config.node_config.cluster.is_central_storage { info!("Set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); @@ -697,9 +733,9 @@ impl DataApiPython3DataStream { self.range.clone(), fetch_info.clone(), one_before, - self.do_decompress, event_chunker_conf, self.disk_io_tune.clone(), + self.reqctx.clone(), &self.node_config, )?; Box::pin(s) as Pin> + Send>> @@ -708,33 +744,13 @@ impl DataApiPython3DataStream { let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); Box::pin(s) as Pin> + Send>> }; - let s = s.map({ - let mut header_out = false; - let mut count_events = 0; - let channel = self.current_channel.clone().unwrap(); - move |b| { - let ret = match b { - Ok(b) => { - let f = match b { - StreamItem::DataItem(RangeCompletableItem::Data(b)) => { - Self::convert_item(b, &channel, &fetch_info, &mut header_out, &mut count_events)? - } - _ => BytesMut::new(), - }; - Ok(f) - } - Err(e) => Err(e), - }; - ret - } - }); - //let _ = Box::new(s) as Box> + Unpin>; let evm = if self.events_max == 0 { usize::MAX } else { self.events_max as usize }; - self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); + self.chan_stream = Some(Box::pin(stream)); + self.current_fetch_info = Some(fetch_info); Ok(()) } } @@ -807,9 +823,9 @@ impl Stream for DataApiPython3DataStream { { let n = Instant::now(); let mut sb = crate::status_board().unwrap(); - sb.mark_alive(&self.status_id); + sb.mark_alive(self.reqctx.reqid()); self.ping_last = n; - sb.mark_ok(&self.status_id); + sb.mark_ok(self.reqctx.reqid()); } continue; } @@ -864,7 +880,7 @@ impl Api1EventsBinaryHandler { .to_owned(); let body_data = hyper::body::to_bytes(body).await?; if body_data.len() < 1024 * 2 && body_data.first() == Some(&"{".as_bytes()[0]) { - info!("request body_data string: {}", String::from_utf8_lossy(&body_data)); + debug!("request body_data string: {}", String::from_utf8_lossy(&body_data)); } let qu = match serde_json::from_slice::(&body_data) { Ok(mut qu) => { @@ -879,6 +895,8 @@ impl Api1EventsBinaryHandler { return Err(Error::with_msg_no_trace("can not parse query")); } }; + let reqid = super::status_board()?.new_status_id(); + let reqctx = netpod::ReqCtx::new(reqid); let span = if qu.log_level() == "trace" { debug!("enable trace for handler"); tracing::span!(tracing::Level::TRACE, "log_span_trace") @@ -888,8 +906,10 @@ impl Api1EventsBinaryHandler { } else { tracing::Span::none() }; - self.handle_for_query(qu, accept, span.clone(), node_config) + let reqidspan = tracing::info_span!("api1query", reqid = reqctx.reqid()); + self.handle_for_query(qu, accept, &reqctx, span.clone(), reqidspan.clone(), node_config) .instrument(span) + .instrument(reqidspan) .await } @@ -897,12 +917,14 @@ impl Api1EventsBinaryHandler { &self, qu: Api1Query, accept: String, + reqctx: &ReqCtxArc, span: tracing::Span, + reqidspan: tracing::Span, ncc: &NodeConfigCached, ) -> Result, Error> { let self_name = any::type_name::(); // TODO this should go to usage statistics: - info!( + debug!( "{self_name} {:?} {} {:?}", qu.range(), qu.channels().len(), @@ -925,24 +947,24 @@ impl Api1EventsBinaryHandler { let ts1 = Instant::now(); let mut chans = Vec::new(); for ch in qu.channels() { - info!("try to find config quorum for {ch:?}"); + debug!("try to find config quorum for {ch:?}"); let ch = SfDbChannel::from_name(backend, ch.name()); let ch_conf = nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ncc).await?; match ch_conf { Some(x) => { + debug!("found quorum {ch:?} {x:?}"); chans.push(x); } None => { + // TODO count in request ctx. error!("no config quorum found for {ch:?}"); } } } let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_millis(); - info!("{self_name} configs fetched in {} ms", dt); - // TODO use a better stream protocol with built-in error delivery. - let status_id = super::status_board()?.new_status_id(); + debug!("{self_name} {} configs fetched in {} ms", chans.len(), dt); let s = DataApiPython3DataStream::new( range.clone(), chans, @@ -951,12 +973,12 @@ impl Api1EventsBinaryHandler { DiskIoTune::default(), qu.decompress(), qu.events_max().unwrap_or(u64::MAX), - status_id.clone(), + reqctx.clone(), ncc.clone(), ); - let s = s.instrument(span); - let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")); - let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", status_id); + let s = s.instrument(span).instrument(reqidspan); + let body = Body::wrap_stream(s); + let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, reqctx.reqid()); let ret = ret.body(body)?; Ok(ret) } else { diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index f6c787b..9966048 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,8 +1,8 @@ use crate::bodystream::response; use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; -use crate::err::Error; use crate::response_err; +use err::Error; use http::Method; use http::Request; use http::Response; @@ -20,7 +20,8 @@ use tracing::Instrument; use url::Url; async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events_json req: {:?}", req); + debug!("{:?}", req); + let reqid = crate::status_board()?.new_status_id(); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { error!("binned_json: {e:?}"); @@ -41,7 +42,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, node_config.node_config.cluster.clone()) + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, reqid, node_config.node_config.cluster.clone()) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 4ab9208..68701da 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -1,33 +1,40 @@ use crate::bodystream::response; -use crate::bodystream::BodyStream; use crate::response_err; +use async_channel::Receiver; +use async_channel::Sender; use bytes::Bytes; use err::thiserror; +use err::ThisError; use err::ToPublicError; use futures_util::Stream; +use futures_util::StreamExt; use http::Method; use http::Request; use http::Response; use http::StatusCode; use hyper::Body; use netpod::log::*; +use netpod::Node; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use serde::Serialize; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use url::Url; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, ThisError)] pub enum FindActiveError { - #[error("HttpBadAccept")] HttpBadAccept, - #[error("HttpBadUrl")] HttpBadUrl, - #[error("{0}")] + #[error("Error({0})")] Error(Box), - #[error("{0}")] + #[error("UrlError({0})")] UrlError(#[from] url::ParseError), - #[error("InternalError")] InternalError, + IO(#[from] std::io::Error), } impl ToPublicError for FindActiveError { @@ -36,8 +43,9 @@ impl ToPublicError for FindActiveError { FindActiveError::HttpBadAccept => format!("{self}"), FindActiveError::HttpBadUrl => format!("{self}"), FindActiveError::Error(e) => e.to_public_error(), - FindActiveError::UrlError(e) => format!("can not parse url: {e}"), + FindActiveError::UrlError(_) => format!("{self}"), FindActiveError::InternalError => format!("{self}"), + FindActiveError::IO(_) => format!("{self}"), } } } @@ -46,7 +54,7 @@ pub struct FindActiveHandler {} impl FindActiveHandler { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/tools/databuffer/findActive" { + if req.uri().path() == "/api/4/tool/sfdatabuffer/find/channel/active" { Some(Self {}) } else { None @@ -83,28 +91,222 @@ impl FindActiveHandler { }; if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { type _A = netpod::BodyStream; - Ok(Response::builder() - .status(StatusCode::OK) - .body(BodyStream::wrapped(Box::pin(DummyStream::new()), "find_active".into())) - .map_err(|_| FindActiveError::InternalError)?) + let stream = FindActiveStream::new(40, 2, ncc); + let stream = stream.chain(FindActiveStream::new(40, 3, ncc)); + let stream = stream + .map(|item| match item { + Ok(item) => { + let mut s = serde_json::to_vec(&item).unwrap(); + s.push(0x0a); + s + } + Err(e) => { + error!("ERROR in http body stream after headers: {e}"); + Vec::new() + } + }) + .map(|x| Ok::<_, String>(Bytes::from(x))); + let body = Body::wrap_stream(Box::pin(stream)); + Ok(Response::builder().status(StatusCode::OK).body(body).unwrap()) } else { Err(FindActiveError::HttpBadAccept) } } } -struct DummyStream {} +#[derive(Debug, Serialize)] +struct ActiveChannelDesc { + ks: u32, + name: String, + totlen: u64, +} -impl DummyStream { - pub fn new() -> Self { - todo!() +async fn sum_dir_contents(path: PathBuf) -> Result { + let mut sum = 0; + let mut dir_stream = tokio::fs::read_dir(path).await?; + loop { + match dir_stream.next_entry().await? { + Some(x) => { + if x.file_name().to_string_lossy().starts_with("..") { + debug!("INCONVENIENT: {x:?}"); + } else if x.file_type().await.unwrap().is_dir() { + let mut dir_stream_2 = tokio::fs::read_dir(x.path()).await?; + loop { + match dir_stream_2.next_entry().await? { + Some(x) => { + let md = x.metadata().await?; + sum += md.len(); + } + None => break, + } + } + } else { + error!("unexpected file: {:?}", x.file_name()); + sum += x.metadata().await?.len(); + } + } + None => break, + } + } + Ok(sum) +} + +struct XorShift32 { + state: u32, +} + +impl XorShift32 { + fn new(state: u32) -> Self { + Self { state } + } + + fn next(&mut self) -> u32 { + let mut x = self.state; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + self.state = x; + x } } -impl Stream for DummyStream { - type Item = Result; +async fn find_active_inner( + max: usize, + ks: u32, + splits: &[u64], + node: Node, + tx: Sender>, +) -> Result<(), FindActiveError> { + let mut count = 0; + let now_sec = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut rng = XorShift32::new(now_sec as u32); + for _ in 0..64 { + rng.next(); + } + let tb_exp = now_sec / 60 / 60 / 24; + let re_tb = regex::Regex::new(r"(0000\d{15})").unwrap(); + let path = disk::paths::datapath_for_keyspace(ks, &node); + let mut dir_stream = tokio::fs::read_dir(path).await?; + let mut channel_dirs = Vec::new(); + loop { + let x = dir_stream.next_entry().await?; + match x { + Some(x) => { + if x.file_name().to_string_lossy().starts_with(".") { + debug!("INCONVENIENT: {x:?}"); + } else if x.file_name().to_string_lossy().starts_with("..") { + debug!("INCONVENIENT: {x:?}"); + } else { + channel_dirs.push((rng.next(), x)); + } + } + None => break, + } + } + channel_dirs.sort_by_key(|x| x.0); + let channel_dirs = channel_dirs; + // TODO randomize channel list using given seed + 'outer: for (_, chdir) in channel_dirs { + let ft = chdir.file_type().await?; + if ft.is_dir() { + let mut dir_stream = tokio::fs::read_dir(chdir.path()) + .await + .map_err(|e| FindActiveError::IO(e))?; + loop { + match dir_stream.next_entry().await? { + Some(e) => { + let x = e.file_name(); + let s = x.to_string_lossy(); + if let Some(_) = re_tb.captures(&s) { + let chn1 = chdir.file_name(); + let chname = chn1.to_string_lossy(); + // debug!("match: {m:?}"); + // TODO bin-size depends on channel config + match s.parse::() { + Ok(x) => { + if x == tb_exp { + // debug!("matching tb {}", chname); + let sum = sum_dir_contents(e.path()).await?; + if sum > 1024 * 1024 * 10 { + // debug!("sizable content: {sum}"); + let x = ActiveChannelDesc { + ks, + name: chname.into(), + totlen: sum, + }; + tx.send(Ok(x)).await; + count += 1; + if count >= max { + break 'outer; + } + } + } + } + Err(_) => {} + } + } else { + // debug!("no match"); + } + } + None => break, + } + } + } else { + error!("unexpected file {chdir:?}"); + } + } + Ok(()) +} - fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll> { - todo!() +async fn find_active( + max: usize, + ks: u32, + splits: Vec, + node: Node, + tx: Sender>, +) { + let tx2 = tx.clone(); + match find_active_inner(max, ks, &splits, node, tx).await { + Ok(x) => x, + Err(e) => { + tx2.send(Err(e)).await; + return; + } + } +} + +struct FindActiveStream { + rx: Receiver>, +} + +impl FindActiveStream { + pub fn new(max: usize, ks: u32, ncc: &NodeConfigCached) -> Self { + let (tx, rx) = async_channel::bounded(4); + let splits = ncc + .node + .sf_databuffer + .as_ref() + .unwrap() + .splits + .as_ref() + .map_or(Vec::new(), Clone::clone); + let _jh = taskrun::spawn(find_active(max, ks, splits, ncc.node.clone(), tx)); + Self { rx } + } +} + +impl Stream for FindActiveStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.rx.poll_next_unpin(cx) { + Ready(Some(item)) => Ready(Some(item)), + Ready(None) => Ready(None), + Pending => Pending, + } } } diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 0147467..68bdc04 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,9 +1,8 @@ use crate::channelconfig::chconf_from_events_v1; -use crate::err::Error; use crate::response; use crate::response_err; -use crate::BodyStream; use crate::ToPublicResponse; +use err::Error; use futures_util::stream; use futures_util::TryStreamExt; use http::Method; @@ -72,15 +71,12 @@ async fn plain_events_binary( req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { - debug!("plain_events_binary req: {:?}", req); + debug!("{:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; let ch_conf = chconf_from_events_v1(&query, node_config).await?; info!("plain_events_binary chconf_from_events_v1: {ch_conf:?}"); let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - s.map_err(Error::from), - format!("plain_events_binary"), - ))?; + let ret = response(StatusCode::OK).body(Body::wrap_stream(s.map_err(Error::from)))?; Ok(ret) } @@ -89,6 +85,7 @@ async fn plain_events_json( req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { + let reqid = crate::status_board()?.new_status_id(); info!("plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; @@ -99,7 +96,8 @@ async fn plain_events_json( .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_json chconf_from_events_v1: {ch_conf:?}"); - let item = streams::plaineventsjson::plain_events_json(&query, ch_conf, &node_config.node_config.cluster).await; + let item = + streams::plaineventsjson::plain_events_json(&query, ch_conf, reqid, &node_config.node_config.cluster).await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/crates/httpret/src/api4/search.rs b/crates/httpret/src/api4/search.rs index 44516c7..1b2a544 100644 --- a/crates/httpret/src/api4/search.rs +++ b/crates/httpret/src/api4/search.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; use crate::bodystream::ToPublicResponse; -use crate::Error; +use err::Error; use http::Method; use http::Request; use http::Response; diff --git a/crates/httpret/src/api4/status.rs b/crates/httpret/src/api4/status.rs index b2f816d..63cfc5b 100644 --- a/crates/httpret/src/api4/status.rs +++ b/crates/httpret/src/api4/status.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; -use crate::err::Error; use crate::ReqCtx; +use crate::RetrievalError; use http::Request; use http::Response; use http::StatusCode; @@ -14,7 +14,7 @@ use std::collections::VecDeque; use std::time::Duration; #[allow(unused)] -async fn table_sizes(node_config: &NodeConfigCached) -> Result { +async fn table_sizes(node_config: &NodeConfigCached) -> Result { let ret = dbconn::table_sizes(node_config).await?; Ok(ret) } @@ -39,12 +39,12 @@ impl StatusNodesRecursive { req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached, - ) -> Result, Error> { + ) -> Result, RetrievalError> { let res = tokio::time::timeout(Duration::from_millis(1200), self.status(req, ctx, node_config)).await; let res = match res { Ok(res) => res, Err(e) => { - let e = Error::from(e).add_public_msg("see timeout"); + let e = RetrievalError::from(e).add_public_msg("see timeout"); return Ok(crate::bodystream::ToPublicResponse::to_public_response(&e)); } }; @@ -67,7 +67,7 @@ impl StatusNodesRecursive { req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, - ) -> Result { + ) -> Result { let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { Some(k) => { diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 92d5e40..1bdb1ef 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -1,16 +1,14 @@ -use crate::err::Error; -use bytes::Bytes; -use futures_util::{Stream, StreamExt}; +use err::Error; +use futures_util::StreamExt; use http::HeaderMap; -use http::{Response, StatusCode}; +use http::Response; +use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::APP_JSON; -use std::panic::AssertUnwindSafe; use std::pin::Pin; -use std::task::{Context, Poll}; -use tracing::field::Empty; -use tracing::{span, Level}; +use std::task::Context; +use std::task::Poll; pub fn response(status: T) -> http::response::Builder where @@ -20,58 +18,6 @@ where Response::builder().status(status) } -pub struct BodyStream { - inp: S, - desc: String, -} - -impl BodyStream -where - S: Stream> + Unpin + Send + 'static, - I: Into + Sized + 'static, -{ - pub fn new(inp: S, desc: String) -> Self { - Self { inp, desc } - } - - pub fn wrapped(inp: S, desc: String) -> Body { - Body::wrap_stream(Self::new(inp, desc)) - } -} - -impl Stream for BodyStream -where - S: Stream> + Unpin, - I: Into + Sized, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let span1 = span!(Level::INFO, "httpret::BodyStream", desc = Empty); - span1.record("desc", &self.desc.as_str()); - span1.in_scope(|| { - use Poll::*; - let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx))); - match t { - Ok(r) => match r { - Ready(Some(Ok(k))) => Ready(Some(Ok(k))), - Ready(Some(Err(e))) => { - error!("body stream error: {e:?}"); - Ready(Some(Err(Error::from(e)))) - } - Ready(None) => Ready(None), - Pending => Pending, - }, - Err(e) => { - error!("panic caught in httpret::BodyStream: {e:?}"); - let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {e:?}")); - Ready(Some(Err(e))) - } - } - }) - } -} - pub trait ToPublicResponse { fn to_public_response(&self) -> Response; } @@ -82,34 +28,6 @@ impl ToPublicResponse for Error { } } -impl ToPublicResponse for ::err::Error { - fn to_public_response(&self) -> Response { - use err::Reason; - let e = self.to_public_error(); - let status = match e.reason() { - Some(Reason::BadRequest) => StatusCode::BAD_REQUEST, - Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }; - let msg = match serde_json::to_string(&e) { - Ok(s) => s, - Err(_) => "can not serialize error".into(), - }; - match response(status) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::from(msg)) - { - Ok(res) => res, - Err(e) => { - error!("can not generate http error response {e:?}"); - let mut res = Response::new(Body::default()); - *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - res - } - } - } -} - struct BodyStreamWrap(netpod::BodyStream); impl hyper::body::HttpBody for BodyStreamWrap { diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 0da4caf..7d4fb6c 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; -use crate::err::Error; use crate::ReqCtx; +use err::Error; use futures_util::StreamExt; use http::Method; use http::Request; diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 8ea5873..80ca2d8 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,7 +1,7 @@ -use crate::err::Error; use crate::response; use crate::ToPublicResponse; use dbconn::create_connection; +use err::Error; use futures_util::StreamExt; use http::Method; use http::Request; @@ -22,6 +22,7 @@ use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use nodenet::configquorum::find_config_basics_quorum; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; @@ -37,7 +38,7 @@ pub async fn chconf_from_events_v1( q: &PlainEventsQuery, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } @@ -45,8 +46,7 @@ pub async fn chconf_from_prebinned( q: &PreBinnedQuery, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = - nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?; Ok(ret) } @@ -54,7 +54,7 @@ pub async fn ch_conf_from_binned( q: &BinnedQuery, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } diff --git a/crates/httpret/src/download.rs b/crates/httpret/src/download.rs index 3519033..6277bd0 100644 --- a/crates/httpret/src/download.rs +++ b/crates/httpret/src/download.rs @@ -1,10 +1,16 @@ -use crate::err::Error; use crate::response; +use crate::RetrievalError; use futures_util::TryStreamExt; -use http::{Method, StatusCode}; -use hyper::{Body, Request, Response}; +use http::Method; +use http::StatusCode; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use netpod::get_url_query_pairs; use netpod::log::*; -use netpod::{get_url_query_pairs, DiskIoTune, FromUrl, NodeConfigCached}; +use netpod::DiskIoTune; +use netpod::FromUrl; +use netpod::NodeConfigCached; use url::Url; #[derive(Clone, Debug)] @@ -61,7 +67,11 @@ impl DownloadHandler { } } - pub async fn get(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn get( + &self, + req: Request, + node_config: &NodeConfigCached, + ) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let p2 = &head.uri.path()[Self::path_prefix().len()..]; let base = match &node_config.node.sf_databuffer { @@ -74,11 +84,15 @@ impl DownloadHandler { let pp = base.join(p2); info!("Try to open {pp:?}"); let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?; - let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); + let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf()); Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + req: Request, + node_config: &NodeConfigCached, + ) -> Result, RetrievalError> { if req.method() == Method::GET { self.get(req, node_config).await } else { diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index 98a9f84..370c445 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -1,12 +1,19 @@ -use crate::err::Error; use crate::response; -use futures_util::{select, FutureExt}; -use http::{Method, StatusCode}; -use hyper::{Body, Client, Request, Response}; +use crate::RetrievalError; +use futures_util::select; +use futures_util::FutureExt; +use http::Method; +use http::StatusCode; +use hyper::Body; +use hyper::Client; +use hyper::Request; +use hyper::Response; use netpod::log::*; +use netpod::Node; +use netpod::NodeConfigCached; use netpod::APP_JSON; -use netpod::{Node, NodeConfigCached}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use serde_json::Value as JsonValue; use std::future::Future; use std::pin::Pin; @@ -26,7 +33,7 @@ struct GatherHost { inst: String, } -async fn process_answer(res: Response) -> Result { +async fn process_answer(res: Response) -> Result { let (pre, mut body) = res.into_parts(); if pre.status != StatusCode::OK { use hyper::body::HttpBody; @@ -48,11 +55,14 @@ async fn process_answer(res: Response) -> Result { Ok(k) => k, Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), }; - Ok::<_, Error>(val) + Ok::<_, RetrievalError>(val) } } -pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> Result, Error> { +pub async fn unused_gather_json_from_hosts( + req: Request, + pathpre: &str, +) -> Result, RetrievalError> { let (part_head, part_body) = req.into_parts(); let bodyslice = hyper::body::to_bytes(part_body).await?; let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?; @@ -72,7 +82,7 @@ pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(Error::with_msg("timeout")) + Err(RetrievalError::with_msg("timeout")) } res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) } @@ -105,7 +115,10 @@ pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> Ok(res) } -pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn gather_get_json( + req: Request, + node_config: &NodeConfigCached, +) -> Result, RetrievalError> { let (head, body) = req.into_parts(); let _bodyslice = hyper::body::to_bytes(body).await?; let pathpre = "/api/4/gather/"; @@ -123,7 +136,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(Error::with_msg("timeout")) + Err(RetrievalError::with_msg("timeout")) } res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) } @@ -181,23 +194,23 @@ pub async fn gather_get_json_generic( // TODO use deadline instead. // TODO Wait a bit longer compared to remote to receive partial results. timeout: Duration, -) -> Result +) -> Result where SM: Send + 'static, - NT: Fn(String, Response) -> Pin, Error>> + Send>> + NT: Fn(String, Response) -> Pin, RetrievalError>> + Send>> + Send + Sync + Copy + 'static, - FT: Fn(Vec<(Tag, Result, Error>)>) -> Result, + FT: Fn(Vec<(Tag, Result, RetrievalError>)>) -> Result, { // TODO remove magic constant let extra_timeout = Duration::from_millis(3000); if urls.len() != bodies.len() { - return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies")); + return Err(RetrievalError::TextError(format!("unequal numbers of urls and bodies"))); } if urls.len() != tags.len() { - return Err(Error::with_msg_no_trace("unequal numbers of urls and tags")); + return Err(RetrievalError::TextError(format!("unequal numbers of urls and tags"))); } let spawned: Vec<_> = urls .into_iter() @@ -227,7 +240,7 @@ where select! { _ = sleep(timeout + extra_timeout).fuse() => { error!("PROXY TIMEOUT"); - Err(Error::with_msg_no_trace("timeout")) + Err(RetrievalError::TextError(format!("timeout"))) } res = { let client = Client::new(); diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 3b00f35..54e5e92 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -4,19 +4,18 @@ pub mod bodystream; pub mod channel_status; pub mod channelconfig; pub mod download; -pub mod err; pub mod gather; pub mod prometheus; pub mod proxy; pub mod pulsemap; pub mod settings; -use self::bodystream::BodyStream; use self::bodystream::ToPublicResponse; use crate::bodystream::response; -use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; +use err::thiserror; +use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; @@ -41,6 +40,7 @@ use nodenet::conn::events_service; use panic::AssertUnwindSafe; use panic::UnwindSafe; use pin::Pin; +use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; use std::net; @@ -59,7 +59,37 @@ use task::Poll; pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; -pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { +#[derive(Debug, ThisError, Serialize, Deserialize)] +pub enum RetrievalError { + Error(#[from] err::Error), + TextError(String), + #[serde(skip)] + Hyper(#[from] hyper::Error), + #[serde(skip)] + Http(#[from] http::Error), + #[serde(skip)] + Serde(#[from] serde_json::Error), + #[serde(skip)] + Fmt(#[from] std::fmt::Error), + #[serde(skip)] + Url(#[from] url::ParseError), +} + +trait IntoBoxedError: std::error::Error {} +impl IntoBoxedError for net::AddrParseError {} +impl IntoBoxedError for tokio::task::JoinError {} +impl IntoBoxedError for api4::databuffer_tools::FindActiveError {} + +impl From for RetrievalError +where + E: ToString + IntoBoxedError, +{ + fn from(value: E) -> Self { + Self::TextError(value.to_string()) + } +} + +pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { let b = StatusBoard::new(); @@ -84,7 +114,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let node_config = node_config.clone(); let addr = conn.remote_addr(); async move { - Ok::<_, Error>(service_fn({ + Ok::<_, RetrievalError>(service_fn({ move |req| { // TODO send to logstash info!( @@ -106,7 +136,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { Ok(()) } -async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, Error> { +async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, RetrievalError> { match http_service_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { @@ -122,7 +152,7 @@ struct Cont { impl Future for Cont where - F: Future>, + F: Future>, { type Output = ::Output; @@ -131,14 +161,14 @@ where match h { Ok(k) => k, Err(e) => { - error!("Cont catch_unwind {:?}", e); - match e.downcast_ref::() { + error!("Cont catch_unwind {e:?}"); + match e.downcast_ref::() { Some(e) => { - error!("Cont catch_unwind is Error: {:?}", e); + error!("Cont catch_unwind is Error: {e:?}"); } None => {} } - Poll::Ready(Err(Error::with_msg(format!("{:?}", e)))) + Poll::Ready(Err(RetrievalError::TextError(format!("{e:?}")))) } } } @@ -182,7 +212,7 @@ impl ReqCtx { } // TODO remove because I want error bodies to be json. -pub fn response_err(status: StatusCode, msg: T) -> Result, Error> +pub fn response_err(status: StatusCode, msg: T) -> Result, RetrievalError> where T: AsRef, { @@ -241,7 +271,10 @@ macro_rules! static_http_api1 { }; } -async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn http_service_try( + req: Request, + node_config: &NodeConfigCached, +) -> Result, RetrievalError> { use http::HeaderValue; let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); @@ -271,7 +304,7 @@ async fn http_service_inner( req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/private/version" { @@ -313,6 +346,8 @@ async fn http_service_inner( h.handle(req, ctx, &node_config).await } else if let Some(h) = StatusBoardAllHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = api4::databuffer_tools::FindActiveHandler::handler(&req) { + Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { @@ -436,7 +471,7 @@ async fn http_service_inner( } } -pub fn api_4_docs(path: &str) -> Result, Error> { +pub fn api_4_docs(path: &str) -> Result, RetrievalError> { static_http!(path, "", "api4.html", "text/html"); static_http!(path, "style.css", "text/css"); static_http!(path, "script.js", "text/javascript"); @@ -444,7 +479,7 @@ pub fn api_4_docs(path: &str) -> Result, Error> { Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) } -pub fn api_1_docs(path: &str) -> Result, Error> { +pub fn api_1_docs(path: &str) -> Result, RetrievalError> { static_http_api1!(path, "", "api1.html", "text/html"); static_http_api1!(path, "style.css", "text/css"); static_http_api1!(path, "script.js", "text/javascript"); @@ -462,7 +497,11 @@ impl StatusBoardAllHandler { } } - pub async fn handle(&self, _req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle( + &self, + _req: Request, + _node_config: &NodeConfigCached, + ) -> Result, RetrievalError> { use std::ops::Deref; let sb = status_board().unwrap(); let buf = serde_json::to_vec(sb.deref()).unwrap(); @@ -471,12 +510,16 @@ impl StatusBoardAllHandler { } } -async fn prebinned(req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result, Error> { +async fn prebinned( + req: Request, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result, RetrievalError> { match prebinned_inner(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { error!("fn prebinned: {e:?}"); - Ok(response(StatusCode::BAD_REQUEST).body(Body::from(e.msg().to_string()))?) + Ok(response(StatusCode::BAD_REQUEST).body(Body::from(format!("[prebinned-error]")))?) } } } @@ -485,7 +528,7 @@ async fn prebinned_inner( req: Request, _ctx: &ReqCtx, _node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let url: url::Url = format!("dummy://{}", head.uri).parse()?; let query = PreBinnedQuery::from_url(&url)?; @@ -493,7 +536,7 @@ async fn prebinned_inner( span1.in_scope(|| { debug!("begin"); }); - error!("TODO hhtpret prebinned_inner"); + error!("TODO httpret prebinned_inner"); //let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1); todo!() } @@ -502,7 +545,7 @@ async fn random_channel( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (_head, _body) = req.into_parts(); let ret = dbconn::random_channel(node_config).await?; let ret = response(StatusCode::OK).body(Body::from(ret))?; @@ -513,7 +556,7 @@ async fn clear_cache_all( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -530,7 +573,7 @@ async fn update_db_with_channel_names( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { info!("httpret::update_db_with_channel_names"); let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { @@ -568,7 +611,7 @@ async fn update_db_with_channel_names_3( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -591,7 +634,7 @@ async fn update_db_with_all_channel_configs( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -614,7 +657,7 @@ async fn update_search_cache( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, Error> { +) -> Result, RetrievalError> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -638,8 +681,9 @@ pub struct StatusBoardEntry { is_error: bool, #[serde(skip_serializing_if = "is_false")] is_ok: bool, - #[serde(skip_serializing_if = "Vec::is_empty")] - errors: Vec, + // #[serde(skip_serializing_if = "Vec::is_empty")] + #[serde(skip)] + errors: Vec>, } mod instant_serde { @@ -693,10 +737,10 @@ impl StatusBoard { use std::io::Read; self.clean(); let mut f = File::open("/dev/urandom").unwrap(); - let mut buf = [0; 8]; + let mut buf = [0; 4]; f.read_exact(&mut buf).unwrap(); - let n = u64::from_le_bytes(buf); - let s = format!("{:016x}", n); + let n = u32::from_le_bytes(buf); + let s = format!("{:08x}", n); info!("new_status_id {s}"); self.entries.insert(s.clone(), StatusBoardEntry::new()); s @@ -738,13 +782,16 @@ impl StatusBoard { } } - pub fn add_error(&mut self, status_id: &str, error: Error) { + pub fn add_error(&mut self, status_id: &str, error: E) + where + E: Into>, + { match self.entries.get_mut(status_id) { Some(e) => { e.ts_updated = SystemTime::now(); e.is_error = true; e.is_ok = false; - e.errors.push(error); + e.errors.push(error.into()); } None => { error!("can not find status id {}", status_id); @@ -764,7 +811,9 @@ impl StatusBoard { let js = StatJs { errors: Vec::new() }; return serde_json::to_string(&js).unwrap(); } else if e.is_error { - let errors = e.errors.iter().map(|e| (&e.0).into()).collect(); + // TODO + // let errors = e.errors.iter().map(|e| (&e.0).into()).collect(); + let errors = vec![err::Error::with_msg_no_trace("TODO convert to user error").into()]; let js = StatJs { errors }; return serde_json::to_string(&js).unwrap(); } else { @@ -785,10 +834,10 @@ impl StatusBoard { static STATUS_BOARD: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut()); -pub fn status_board() -> Result, Error> { +pub fn status_board() -> Result, RetrievalError> { let x = unsafe { &*STATUS_BOARD.load(Ordering::SeqCst) }.write(); match x { Ok(x) => Ok(x), - Err(e) => Err(Error::with_msg(format!("{e:?}"))), + Err(e) => Err(RetrievalError::TextError(format!("{e}"))), } } diff --git a/crates/httpret/src/prometheus.rs b/crates/httpret/src/prometheus.rs index 01c0ce4..95ce639 100644 --- a/crates/httpret/src/prometheus.rs +++ b/crates/httpret/src/prometheus.rs @@ -1,11 +1,20 @@ -use crate::err::Error; -use http::{HeaderMap, HeaderValue, Method, Request, Response, StatusCode}; +use crate::RetrievalError; +use http::HeaderMap; +use http::HeaderValue; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Server}; +use hyper::service::make_service_fn; +use hyper::service::service_fn; +use hyper::Body; +use hyper::Server; use netpod::log::*; -use netpod::{ACCEPT_ALL, APP_JSON}; -use serde_json::{json, Value}; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; +use serde_json::json; +use serde_json::Value; use std::collections::BTreeMap; use std::net::SocketAddr; use std::sync::Once; @@ -50,7 +59,7 @@ impl StatusBuildInfoHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); if req.method() == Method::GET { if accepts_json(req.headers()) { @@ -91,7 +100,7 @@ impl SeriesHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); if req.method() == Method::GET || req.method() == Method::POST { if accepts_json(req.headers()) { @@ -128,7 +137,7 @@ impl MetadataHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); if req.method() == Method::GET { if accepts_json(req.headers()) { @@ -163,7 +172,7 @@ impl LabelsHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { let self_name = std::any::type_name::(); info!("{} for {:?}", self_name, req); if req.method() == Method::GET || req.method() == Method::POST { @@ -218,7 +227,7 @@ impl LabelValuesHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { let self_name = std::any::type_name::(); info!("{} for {:?}", self_name, req); info!("LABEL {:?}", self.label); @@ -263,7 +272,7 @@ impl QueryHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); let url = url::Url::parse(&format!("dummy://{}", &req.uri())); info!("/api/v1/query parsed url: {:?}", url); @@ -295,7 +304,7 @@ impl QueryRangeHandler { } } - pub async fn handle(&self, req: Request) -> Result, Error> { + pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); let url = url::Url::parse(&format!("dummy://{}", &req.uri())); info!("/api/v1/query_range parsed url: {:?}", url); @@ -375,7 +384,7 @@ impl QueryRangeHandler { } } -async fn http_service_inner(req: Request) -> Result, Error> { +async fn http_service_inner(req: Request) -> Result, RetrievalError> { if let Some(h) = StatusBuildInfoHandler::handler(&req) { h.handle(req).await } else if let Some(h) = SeriesHandler::handler(&req) { @@ -396,7 +405,7 @@ async fn http_service_inner(req: Request) -> Result, Error> } } -async fn http_service(req: Request) -> Result, Error> { +async fn http_service(req: Request) -> Result, RetrievalError> { match http_service_inner(req).await { Ok(k) => Ok(k), Err(e) => { @@ -406,12 +415,12 @@ async fn http_service(req: Request) -> Result, Error> { } } -pub async fn host(bind: SocketAddr) -> Result<(), Error> { +pub async fn host(bind: SocketAddr) -> Result<(), RetrievalError> { let make_service = make_service_fn({ move |conn: &AddrStream| { let addr = conn.remote_addr(); async move { - Ok::<_, Error>(service_fn({ + Ok::<_, RetrievalError>(service_fn({ move |req| { info!( "REQUEST {:?} - {:?} - {:?} - {:?}", diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index aec7af0..145bd81 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -6,7 +6,6 @@ use crate::api1::channel_search_list_v1; use crate::api1::gather_json_2_v1; use crate::api_1_docs; use crate::api_4_docs; -use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::pulsemap::MapPulseQuery; @@ -15,6 +14,7 @@ use crate::response_err; use crate::Cont; use crate::ReqCtx; use crate::PSI_DAQBUFFER_SERVICE_MARK; +use err::Error; use futures_util::pin_mut; use futures_util::Stream; use http::Method; diff --git a/crates/httpret/src/proxy/api1.rs b/crates/httpret/src/proxy/api1.rs index ae4edc0..64d224e 100644 --- a/crates/httpret/src/proxy/api1.rs +++ b/crates/httpret/src/proxy/api1.rs @@ -1,8 +1,8 @@ pub mod reqstatus; use crate::bodystream::response; -use crate::err::Error; use crate::ReqCtx; +use err::Error; use http::HeaderValue; use http::Method; use http::Request; @@ -14,6 +14,7 @@ use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::ProxyConfig; use netpod::ACCEPT_ALL; +use netpod::X_DAQBUF_REQID; pub struct PythonDataApi1Query {} @@ -85,10 +86,8 @@ impl PythonDataApi1Query { } else { info!("backend returned OK"); let riq_def = HeaderValue::from_static("(none)"); - let riq = head.headers.get("x-daqbuffer-request-id").unwrap_or(&riq_def); - Ok(response(StatusCode::OK) - .header("x-daqbuffer-request-id", riq) - .body(body)?) + let riq = head.headers.get(X_DAQBUF_REQID).unwrap_or(&riq_def); + Ok(response(StatusCode::OK).header(X_DAQBUF_REQID, riq).body(body)?) } } else { Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) diff --git a/crates/httpret/src/proxy/api1/reqstatus.rs b/crates/httpret/src/proxy/api1/reqstatus.rs index 005c27a..5905ba5 100644 --- a/crates/httpret/src/proxy/api1/reqstatus.rs +++ b/crates/httpret/src/proxy/api1/reqstatus.rs @@ -1,5 +1,5 @@ use crate::bodystream::response; -use crate::err::Error; +use err::Error; use http::Method; use http::Request; use http::Response; diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 4a5a3be..612b044 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -1,12 +1,12 @@ pub mod caioclookup; use crate::bodystream::ToPublicResponse; -use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::gather::Tag; use crate::response; use crate::ReqCtx; +use err::Error; use futures_util::Future; use http::Method; use http::Request; diff --git a/crates/httpret/src/proxy/api4/caioclookup.rs b/crates/httpret/src/proxy/api4/caioclookup.rs index 105c654..ac93a16 100644 --- a/crates/httpret/src/proxy/api4/caioclookup.rs +++ b/crates/httpret/src/proxy/api4/caioclookup.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; -use crate::err::Error; use crate::ReqCtx; +use err::Error; use http::Request; use http::Response; use http::StatusCode; diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index aa39bcf..57015c3 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -1,4 +1,3 @@ -use crate::err::Error; use crate::response; use async_channel::Receiver; use async_channel::Sender; @@ -7,6 +6,7 @@ use bytes::BufMut; use bytes::BytesMut; use chrono::TimeZone; use chrono::Utc; +use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; diff --git a/crates/httpret/src/settings.rs b/crates/httpret/src/settings.rs index 04ebf33..ed3c78a 100644 --- a/crates/httpret/src/settings.rs +++ b/crates/httpret/src/settings.rs @@ -1,10 +1,14 @@ -use crate::err::Error; use crate::response; -use http::{Method, StatusCode}; -use hyper::{Body, Request, Response}; +use err::Error; +use http::Method; +use http::StatusCode; +use hyper::Body; +use hyper::Request; +use hyper::Response; use netpod::log::*; use netpod::NodeConfigCached; -use netpod::{ACCEPT_ALL, APP_JSON}; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; pub struct SettingsThreadsMaxHandler {} diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs index cecfc2a..3e12eef 100644 --- a/crates/items_0/src/streamitem.rs +++ b/crates/items_0/src/streamitem.rs @@ -71,6 +71,8 @@ impl LogItem { pub type Sitemty = Result>, Error>; +pub type Sitemty2 = Result>, E>; + #[macro_export] macro_rules! on_sitemty_range_complete { ($item:expr, $ex:expr) => { diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 9823f3b..07bcf1c 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -27,3 +27,4 @@ items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } taskrun = { path = "../taskrun" } parse = { path = "../parse" } +bitshuffle = { path = "../bitshuffle" } diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 4e89ee8..da397c4 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -1,12 +1,17 @@ use crate::framable::FrameType; use crate::merger::Mergeable; +use bitshuffle::bitshuffle_decompress; use bytes::BytesMut; +use err::thiserror; +use err::ThisError; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; use items_0::Empty; use items_0::MergeError; use items_0::WithLen; +#[allow(unused)] +use netpod::log::*; use netpod::ScalarType; use netpod::Shape; use parse::channelconfig::CompressionMethod; @@ -15,15 +20,20 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; use std::collections::VecDeque; +use std::time::Instant; + +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*) }; +} #[derive(Debug, Serialize, Deserialize)] pub struct EventFull { pub tss: VecDeque, pub pulses: VecDeque, - pub blobs: VecDeque>>, + pub blobs: VecDeque>, //#[serde(with = "decomps_serde")] - // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. - pub decomps: VecDeque>>, pub scalar_types: VecDeque, pub be: VecDeque, pub shapes: VecDeque, @@ -74,20 +84,17 @@ impl EventFull { &mut self, ts: u64, pulse: u64, - blob: Option>, - decomp: Option>, + blob: Vec, scalar_type: ScalarType, be: bool, shape: Shape, comp: Option, ) { - let m1 = blob.as_ref().map_or(0, |x| x.len()); - let m2 = decomp.as_ref().map_or(0, |x| x.len()); - self.entry_payload_max = self.entry_payload_max.max(m1 as u64 + m2 as u64); + let m1 = blob.len(); + self.entry_payload_max = self.entry_payload_max.max(m1 as u64); self.tss.push_back(ts); self.pulses.push_back(pulse); self.blobs.push_back(blob); - self.decomps.push_back(decomp); self.scalar_types.push_back(scalar_type); self.be.push_back(be); self.shapes.push_back(shape); @@ -106,7 +113,6 @@ impl EventFull { self.tss.truncate(nkeep); self.pulses.truncate(nkeep); self.blobs.truncate(nkeep); - self.decomps.truncate(nkeep); self.scalar_types.truncate(nkeep); self.be.truncate(nkeep); self.shapes.truncate(nkeep); @@ -130,7 +136,6 @@ impl Empty for EventFull { tss: VecDeque::new(), pulses: VecDeque::new(), blobs: VecDeque::new(), - decomps: VecDeque::new(), scalar_types: VecDeque::new(), be: VecDeque::new(), shapes: VecDeque::new(), @@ -170,15 +175,12 @@ impl Mergeable for EventFull { let r = range.0..range.1; let mut max = dst.entry_payload_max; for i in r.clone() { - let m1 = self.blobs[i].as_ref().map_or(0, |x| x.len()); - let m2 = self.decomps[i].as_ref().map_or(0, |x| x.len()); - max = max.max(m1 as u64 + m2 as u64); + max = max.max(self.blobs[i].len() as _); } dst.entry_payload_max = max; dst.tss.extend(self.tss.drain(r.clone())); dst.pulses.extend(self.pulses.drain(r.clone())); dst.blobs.extend(self.blobs.drain(r.clone())); - dst.decomps.extend(self.decomps.drain(r.clone())); dst.scalar_types.extend(self.scalar_types.drain(r.clone())); dst.be.extend(self.be.drain(r.clone())); dst.shapes.extend(self.shapes.drain(r.clone())); @@ -213,3 +215,80 @@ impl Mergeable for EventFull { None } } + +#[derive(Debug, ThisError, Serialize, Deserialize)] +pub enum DecompError { + TooLittleInput, + BadCompresionBlockSize, + UnusedBytes, + BitshuffleError, +} + +fn decompress(databuf: &[u8], type_size: u32, ele_count_2: u64, ele_count_exp: u64) -> Result, DecompError> { + let ts1 = Instant::now(); + if databuf.len() < 12 { + return Err(DecompError::TooLittleInput); + } + let value_bytes = u64::from_be_bytes(databuf[0..8].try_into().unwrap()); + let block_size = u32::from_be_bytes(databuf[8..12].try_into().unwrap()); + trace2!( + "decompress len {} value_bytes {} block_size {}", + databuf.len(), + value_bytes, + block_size + ); + if block_size > 1024 * 32 { + return Err(DecompError::BadCompresionBlockSize); + } + let ele_count = value_bytes / type_size as u64; + trace2!( + "ele_count {} ele_count_2 {} ele_count_exp {}", + ele_count, + ele_count_2, + ele_count_exp + ); + let mut decomp = Vec::with_capacity(type_size as usize * ele_count as usize); + unsafe { + decomp.set_len(decomp.capacity()); + } + match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as _, type_size as _, 0) { + Ok(c1) => { + if 12 + c1 != databuf.len() { + Err(DecompError::UnusedBytes) + } else { + let ts2 = Instant::now(); + let _dt = ts2.duration_since(ts1); + // TODO analyze the histo + //self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); + Ok(decomp) + } + } + Err(_) => Err(DecompError::BitshuffleError), + } +} + +impl EventFull { + pub fn data_raw(&self, i: usize) -> &[u8] { + &self.blobs[i] + } + + pub fn data_decompressed( + &self, + i: usize, + _scalar_type: &ScalarType, + shape: &Shape, + ) -> Result, DecompError> { + if let Some(comp) = &self.comps[i] { + match comp { + CompressionMethod::BitshuffleLZ4 => { + let type_size = self.scalar_types[i].bytes() as u32; + let ele_count = self.shapes[i].ele_count(); + decompress(&self.blobs[i], type_size, ele_count, shape.ele_count()) + } + } + } else { + // TODO use a Cow type. + Ok(self.blobs[i].clone()) + } + } +} diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index a9804bd..3a92bdc 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -36,10 +36,11 @@ use std::time::Duration; use timeunits::*; use url::Url; -pub const APP_JSON: &'static str = "application/json"; -pub const APP_JSON_LINES: &'static str = "application/jsonlines"; -pub const APP_OCTET: &'static str = "application/octet-stream"; -pub const ACCEPT_ALL: &'static str = "*/*"; +pub const APP_JSON: &str = "application/json"; +pub const APP_JSON_LINES: &str = "application/jsonlines"; +pub const APP_OCTET: &str = "application/octet-stream"; +pub const ACCEPT_ALL: &str = "*/*"; +pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id"; pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; @@ -1100,6 +1101,14 @@ impl Shape { let ret = serde_json::from_str(s)?; Ok(ret) } + + pub fn ele_count(&self) -> u64 { + match self { + Shape::Scalar => 1, + Shape::Wave(n) => *n as u64, + Shape::Image(n, m) => *n as u64 * *m as u64, + } + } } impl AppendToUrl for Shape { @@ -2987,3 +2996,23 @@ mod test_parse { assert_eq!(a.get("shape").unwrap(), "Image(3, 4)"); } } + +pub struct ReqCtx { + reqid: String, +} + +impl ReqCtx { + pub fn new(reqid: S) -> std::sync::Arc + where + S: Into, + { + let ret = Self { reqid: reqid.into() }; + std::sync::Arc::new(ret) + } + + pub fn reqid(&self) -> &str { + &self.reqid + } +} + +pub type ReqCtxArc = std::sync::Arc; diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index fac370a..a16b4e4 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -86,6 +86,19 @@ pub async fn find_config_basics_quorum( ncc: &NodeConfigCached, ) -> Result, Error> { if let Some(_cfg) = &ncc.node.sf_databuffer { + let channel = if channel.name().is_empty() { + if let Some(_) = channel.series() { + let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?; + let pgclient = std::sync::Arc::new(pgclient); + dbconn::find_sf_channel_by_series(channel, pgclient) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + } else { + channel + } + } else { + channel + }; match find_sf_ch_config_quorum(channel, range, ncc).await? { Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))), None => Ok(None), diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index a3e1782..8642c3e 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -1,5 +1,7 @@ use crate::scylla::scylla_channel_event_stream; +use err::thiserror; use err::Error; +use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; @@ -20,6 +22,7 @@ use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::NodeConfigCached; +use netpod::ReqCtxArc; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; use std::net::SocketAddr; @@ -40,6 +43,9 @@ const TEST_BACKEND: &str = "testbackend-00"; #[cfg(test)] mod test; +#[derive(Debug, ThisError)] +pub enum NodeNetError {} + pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; @@ -70,6 +76,7 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { async fn make_channel_events_stream_data( subq: EventsSubQuery, + reqctx: ReqCtxArc, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { if subq.backend() == TEST_BACKEND { @@ -129,24 +136,137 @@ async fn make_channel_events_stream_data( Err(e) } else { let cfg = subq.ch_conf().to_sf_databuffer()?; - Ok(disk::raw::conn::make_event_pipe(subq, cfg, ncc).await?) + Ok(disk::raw::conn::make_event_pipe(subq, cfg, reqctx, ncc).await?) } } async fn make_channel_events_stream( subq: EventsSubQuery, + reqctx: ReqCtxArc, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?; let empty = sitem_data(ChannelEvents::Events(empty)); - let stream = make_channel_events_stream_data(subq, ncc).await?; + let stream = make_channel_events_stream_data(subq, reqctx, ncc).await?; let ret = futures_util::stream::iter([empty]).chain(stream); let ret = Box::pin(ret); Ok(ret) } +async fn events_conn_handler_with_reqid( + mut netout: OwnedWriteHalf, + evq: EventsSubQuery, + ncc: &NodeConfigCached, +) -> Result<(), ConnErr> { + let reqctx = netpod::ReqCtx::new(evq.reqid()); + if evq.create_errors_contains("nodenet_parse_query") { + let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); + return Err((e, netout).into()); + } + let stream: Pin> + Send>> = if evq.is_event_blobs() { + // TODO support event blobs as transform + let fetch_info = match evq.ch_conf().to_sf_databuffer() { + Ok(x) => x, + Err(e) => return Err((e, netout).into()), + }; + match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc).await { + Ok(stream) => { + let stream = stream.map(|x| Box::new(x) as _); + Box::pin(stream) + } + Err(e) => return Err((e, netout).into()), + } + } else { + match make_channel_events_stream(evq.clone(), reqctx, ncc).await { + Ok(stream) => { + if false { + // TODO wasm example + use wasmer::Value; + let wasm = b""; + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + let import_object = wasmer::imports! {}; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let add_one = instance.exports.get_function("event_transform").unwrap(); + let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); + assert_eq!(result[0], Value::I32(43)); + } + let mut tr = match build_event_transform(evq.transform()) { + Ok(x) => x, + Err(e) => { + return Err((e, netout).into()); + } + }; + let stream = stream.map(move |x| { + let item = on_sitemty_data!(x, |x| { + let x: Box = Box::new(x); + let x = tr.0.transform(x); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + Box::new(item) as Box + }); + Box::pin(stream) + } + Err(e) => { + return Err((e, netout).into()); + } + } + }; + let mut stream = stream; + let mut buf_len_histo = HistoLog2::new(5); + while let Some(item) = stream.next().await { + let item = item.make_frame(); + match item { + Ok(buf) => { + buf_len_histo.ingest(buf.len() as u32); + match netout.write_all(&buf).await { + Ok(()) => { + // TODO collect timing information and send as summary in a stats item. + // TODO especially collect a distribution over the buf lengths that were send. + // TODO we want to see a reasonable batch size. + } + Err(e) => return Err((e, netout))?, + } + } + Err(e) => { + error!("events_conn_handler_inner_try sees error in stream: {e:?}"); + return Err((e, netout))?; + } + } + } + { + let item = LogItem { + node_ix: ncc.ix as _, + level: Level::INFO, + msg: format!("buf_len_histo: {:?}", buf_len_histo), + }; + let item: Sitemty = Ok(StreamItem::Log(item)); + let buf = match item.make_frame() { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + match netout.write_all(&buf).await { + Ok(()) => (), + Err(e) => return Err((e, netout))?, + } + } + let buf = match make_term_frame() { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + match netout.write_all(&buf).await { + Ok(()) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(()) => (), + Err(e) => return Err((e, netout))?, + } + Ok(()) +} + async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { - let mut h = InMemoryFrameAsyncReadStream::new(netin, netpod::ByteSize::from_kb(1)); + let mut h = InMemoryFrameAsyncReadStream::new(netin, netpod::ByteSize::from_kb(8)); let mut frames = Vec::new(); while let Some(k) = h .next() @@ -207,7 +327,7 @@ async fn events_conn_handler_inner_try( ncc: &NodeConfigCached, ) -> Result<(), ConnErr> { let _ = addr; - let (netin, mut netout) = stream.into_split(); + let (netin, netout) = stream.into_split(); let frames = match events_get_input_frames(netin).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), @@ -216,111 +336,10 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - info!("events_parse_input_query {evq:?}"); - if evq.create_errors_contains("nodenet_parse_query") { - let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); - return Err((e, netout).into()); - } - let mut stream: Pin> + Send>> = if evq.is_event_blobs() { - // TODO support event blobs as transform - let fetch_info = match evq.ch_conf().to_sf_databuffer() { - Ok(x) => x, - Err(e) => return Err((e, netout).into()), - }; - match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, ncc).await { - Ok(stream) => { - let stream = stream.map(|x| Box::new(x) as _); - Box::pin(stream) - } - Err(e) => return Err((e, netout).into()), - } - } else { - match make_channel_events_stream(evq.clone(), ncc).await { - Ok(stream) => { - if false { - // TODO wasm example - use wasmer::Value; - let wasm = b""; - let mut store = wasmer::Store::default(); - let module = wasmer::Module::new(&store, wasm).unwrap(); - let import_object = wasmer::imports! {}; - let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); - let add_one = instance.exports.get_function("event_transform").unwrap(); - let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); - assert_eq!(result[0], Value::I32(43)); - } - let mut tr = match build_event_transform(evq.transform()) { - Ok(x) => x, - Err(e) => { - return Err((e, netout).into()); - } - }; - let stream = stream.map(move |x| { - let item = on_sitemty_data!(x, |x| { - let x: Box = Box::new(x); - let x = tr.0.transform(x); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - }); - Box::new(item) as Box - }); - Box::pin(stream) - } - Err(e) => { - return Err((e, netout).into()); - } - } - }; - - let mut buf_len_histo = HistoLog2::new(5); - while let Some(item) = stream.next().await { - let item = item.make_frame(); - match item { - Ok(buf) => { - buf_len_histo.ingest(buf.len() as u32); - match netout.write_all(&buf).await { - Ok(()) => { - // TODO collect timing information and send as summary in a stats item. - // TODO especially collect a distribution over the buf lengths that were send. - // TODO we want to see a reasonable batch size. - } - Err(e) => return Err((e, netout))?, - } - } - Err(e) => { - error!("events_conn_handler_inner_try sees error in stream: {e:?}"); - return Err((e, netout))?; - } - } - } - { - let item = LogItem { - node_ix: ncc.ix as _, - level: Level::INFO, - msg: format!("buf_len_histo: {:?}", buf_len_histo), - }; - let item: Sitemty = Ok(StreamItem::Log(item)); - let buf = match item.make_frame() { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - match netout.write_all(&buf).await { - Ok(()) => (), - Err(e) => return Err((e, netout))?, - } - } - let buf = match make_term_frame() { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - match netout.write_all(&buf).await { - Ok(()) => (), - Err(e) => return Err((e, netout))?, - } - match netout.flush().await { - Ok(()) => (), - Err(e) => return Err((e, netout))?, - } - Ok(()) + debug!("events_conn_handler sees: {evq:?}"); + let reqid = evq.reqid(); + let span = tracing::info_span!("subreq", reqid = reqid); + events_conn_handler_with_reqid(netout, evq, ncc).instrument(span).await } async fn events_conn_handler_inner( diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 635aa1a..33d3a42 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -97,7 +97,7 @@ fn raw_data_00() { ); let select = EventsSubQuerySelect::new(fetch_info.into(), range.into(), TransformQuery::default_events()); let settings = EventsSubQuerySettings::default(); - let qu = EventsSubQuery::from_parts(select, settings); + let qu = EventsSubQuery::from_parts(select, settings, "dummy".into()); let frame1 = Frame1Parts::new(qu.clone()); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); let frame = sitem_data(query).make_frame()?; diff --git a/crates/parse/src/channelconfig.rs b/crates/parse/src/channelconfig.rs index 7ce704e..22fffe1 100644 --- a/crates/parse/src/channelconfig.rs +++ b/crates/parse/src/channelconfig.rs @@ -1,7 +1,4 @@ -use chrono::DateTime; -use chrono::Utc; -use err::thiserror; -use err::Error; +use err::*; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; @@ -32,57 +29,43 @@ use tokio::io::ErrorKind; const TEST_BACKEND: &str = "testbackend-00"; -#[derive(Debug, thiserror::Error)] -#[error("ConfigParseError")] +#[derive(Debug, ThisError)] +// #[error("ConfigParseError")] pub enum ConfigParseError { NotSupportedOnNode, FileNotFound, PermissionDenied, IO, - ParseError, + ParseError(String), NotSupported, } -#[derive(Debug)] -pub struct NErr { - msg: String, -} - -impl From> for NErr { +impl From> for ConfigParseError { fn from(k: nom::Err) -> Self { - Self { - msg: format!("nom::Err {:?}", k), - } + let msg = format!("nom::Err {:?}", k); + Self::ParseError(msg) } } -impl nom::error::ParseError for NErr { +impl nom::error::ParseError for ConfigParseError { fn from_error_kind(_input: I, kind: nom::error::ErrorKind) -> Self { - Self { - msg: format!("ParseError {:?}", kind), - } + let msg = format!("ParseError kind {:?}", kind); + Self::ParseError(msg) } fn append(_input: I, kind: nom::error::ErrorKind, other: Self) -> Self { - Self { - msg: format!("ParseError kind {:?} other {:?}", kind, other), - } + let msg = format!("ParseError kind {:?} other {:?}", kind, other); + Self::ParseError(msg) } } -impl From for Error { - fn from(x: NErr) -> Self { - Self::with_msg_no_trace(x.msg) - } -} - -type NRes<'a, O> = nom::IResult<&'a [u8], O, NErr>; +type NRes<'a, O> = nom::IResult<&'a [u8], O, ConfigParseError>; fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> where S: Into, { - let e = NErr { msg: msg.into() }; + let e = ConfigParseError::ParseError(msg.into()); Err(nom::Err::Error(e)) } @@ -287,7 +270,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { } /// Parse a complete configuration file from given in-memory input buffer. -pub fn parse_config(inp: &[u8]) -> NRes { +fn parse_config_inner(inp: &[u8]) -> NRes { let (inp, ver) = be_i16(inp)?; let (inp, len1) = be_i32(inp)?; if len1 <= 8 || len1 > 500 { @@ -325,6 +308,11 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } +pub fn parse_config(inp: &[u8]) -> Result { + let (_inp, ret) = parse_config_inner(inp).map_err(|e| ConfigParseError::ParseError(e.to_string()))?; + Ok(ret) +} + async fn read_local_config_real( channel: SfDbChannel, ncc: &NodeConfigCached, @@ -340,10 +328,7 @@ async fn read_local_config_real( .join("latest") .join("00000_Config"); match tokio::fs::read(&path).await { - Ok(buf) => { - let config = parse_config(&buf).map_err(|_| ConfigParseError::ParseError)?; - Ok(config.1) - } + Ok(buf) => parse_config(&buf), Err(e) => match e.kind() { ErrorKind::NotFound => Err(ConfigParseError::FileNotFound), ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied), @@ -579,13 +564,13 @@ mod test { #[test] fn parse_dummy() { let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); - assert_eq!(0, config.1.format_version); - assert_eq!("abc", config.1.channel_name); + assert_eq!(0, config.format_version); + assert_eq!("abc", config.channel_name); } #[test] fn open_file() { - let config = parse_config(&read_data()).unwrap().1; + let config = parse_config(&read_data()).unwrap(); assert_eq!(config.format_version, 0); assert_eq!(config.entries.len(), 18); for e in &config.entries { diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index c647ff4..854e309 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -383,14 +383,16 @@ pub struct EventsSubQuery { select: EventsSubQuerySelect, settings: EventsSubQuerySettings, ty: String, + reqid: String, } impl EventsSubQuery { - pub fn from_parts(select: EventsSubQuerySelect, settings: EventsSubQuerySettings) -> Self { + pub fn from_parts(select: EventsSubQuerySelect, settings: EventsSubQuerySettings, reqid: String) -> Self { Self { select, settings, ty: "EventsSubQuery".into(), + reqid, } } @@ -431,7 +433,8 @@ impl EventsSubQuery { } pub fn inmem_bufcap(&self) -> ByteSize { - ByteSize::from_kb(4) + // TODO should depend on the type of backend: only imagebuffer needs large size. + ByteSize::from_kb(1024 * 30) } // A rough indication on how many bytes this request is allowed to return. Otherwise, the result should @@ -455,6 +458,10 @@ impl EventsSubQuery { pub fn create_errors_contains(&self, x: &str) -> bool { self.settings.create_errors.contains(&String::from(x)) } + + pub fn reqid(&self) -> &str { + &self.reqid + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 5acd7ac..bbade7c 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -24,12 +24,13 @@ use std::time::Instant; pub async fn plain_events_json( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, + reqid: String, cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); let select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); let settings = EventsSubQuerySettings::from(evq); - let subq = EventsSubQuery::from_parts(select, settings); + let subq = EventsSubQuery::from_parts(select, settings, reqid); // TODO remove magic constant let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; diff --git a/crates/streams/src/slidebuf.rs b/crates/streams/src/slidebuf.rs index aa25174..e595394 100644 --- a/crates/streams/src/slidebuf.rs +++ b/crates/streams/src/slidebuf.rs @@ -3,7 +3,7 @@ use std::fmt; #[derive(Debug)] pub enum Error { NotEnoughBytes, - NotEnoughSpace, + NotEnoughSpace(usize, usize, usize), TryFromSliceError, } @@ -97,7 +97,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < x { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), x)); } else { self.wp += x; Ok(()) @@ -260,7 +260,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < n { - Err(Error::NotEnoughSpace) + Err(Error::NotEnoughSpace(self.cap(), self.wcap(), n)) } else { let ret = &mut self.buf[self.wp..self.wp + n]; self.wp += n; @@ -293,7 +293,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < need_min { - Err(Error::NotEnoughSpace) + Err(Error::NotEnoughSpace(self.cap(), self.wcap(), need_min)) } else { let ret = &mut self.buf[self.wp..]; Ok(ret) @@ -307,7 +307,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < buf.len() { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), buf.len())); } else { self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); self.wp += buf.len(); @@ -324,7 +324,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; @@ -341,7 +341,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; @@ -358,7 +358,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; @@ -375,7 +375,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; @@ -392,7 +392,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; @@ -409,7 +409,7 @@ impl SlideBuf { self.rewind(); } if self.wcap() < TS { - return Err(Error::NotEnoughSpace); + return Err(Error::NotEnoughSpace(self.cap(), self.wcap(), TS)); } else { self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); self.wp += TS; diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 90e9d84..82efba8 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -41,11 +41,12 @@ async fn timebinnable_stream( range: NanoRange, one_before_range: bool, ch_conf: ChannelTypeConfigGen, + reqid: String, cluster: Cluster, ) -> Result { let select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); let settings = EventsSubQuerySettings::from(&query); - let subq = EventsSubQuery::from_parts(select, settings); + let subq = EventsSubQuery::from_parts(select, settings, reqid); let mut tr = build_merged_event_transform(subq.transform())?; let inps = open_tcp_streams::(subq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. @@ -72,6 +73,7 @@ async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, ch_conf: ChannelTypeConfigGen, + reqid: String, cluster: Cluster, ) -> Result>> + Send>>, Error> { let range = binned_range.binned_range_time().to_nano_range(); @@ -79,7 +81,7 @@ async fn timebinned_stream( let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, cluster).await?; + let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, reqid, cluster).await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. @@ -105,12 +107,13 @@ fn timebinned_to_collectable( pub async fn timebinned_json( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, + reqid: String, cluster: Cluster, ) -> Result { let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let collect_max = 10000; - let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, cluster).await?; + let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, reqid, cluster).await?; let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected);