From 9e3395bf131b01ed8904337ce22c9255ea8807ea Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 15 Jun 2022 14:27:38 +0200 Subject: [PATCH] Refactoring --- daqbufp2/src/test/binnedjson.rs | 4 +- daqbufp2/src/test/events.rs | 8 +- dbconn/Cargo.toml | 5 +- dbconn/src/{lib.rs => dbconn.rs} | 0 dbconn/src/scan.rs | 27 +++-- disk/src/binned.rs | 32 +++--- disk/src/binned/pbv.rs | 34 ++++-- disk/src/cache.rs | 3 + disk/src/channelexec.rs | 52 +++------- disk/src/events.rs | 136 +++++++----------------- disk/src/frame/inmem.rs | 7 +- dq/src/dq.rs | 12 +-- httpret/src/api1.rs | 9 +- httpret/src/bodystream.rs | 135 ++++++++++++++++++++++++ httpret/src/channelconfig.rs | 69 +++++++----- httpret/src/events.rs | 16 ++- httpret/src/evinfo.rs | 22 ++-- httpret/src/httpret.rs | 173 ++++--------------------------- httpret/src/proxy.rs | 4 +- items/src/frame.rs | 23 +++- items/src/lib.rs | 3 + netpod/src/netpod.rs | 6 ++ netpod/src/query.rs | 20 ++++ nodenet/src/conn.rs | 21 +++- nodenet/src/scylla.rs | 29 ++++-- 25 files changed, 429 insertions(+), 421 deletions(-) rename dbconn/src/{lib.rs => dbconn.rs} (100%) create mode 100644 httpret/src/bodystream.rs diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 2785636..cd714dc 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -3,7 +3,7 @@ mod channelarchiver; use crate::err::ErrConv; use crate::nodes::{require_sls_test_host_running, require_test_hosts_running}; use chrono::{DateTime, Utc}; -use disk::events::PlainEventsJsonQuery; +use disk::events::PlainEventsQuery; use err::Error; use http::StatusCode; use hyper::Body; @@ -422,7 +422,7 @@ async fn get_events_json_common_res( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = PlainEventsJsonQuery::new(channel, range, 4096, None, false); + let mut query = PlainEventsQuery::new(channel, range, 4096, None, false); query.set_timeout(Duration::from_millis(15000)); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", node0.host, node0.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 3d9e7ed..c5a2ef0 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -1,7 +1,7 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; +use disk::events::PlainEventsQuery; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -12,7 +12,7 @@ use items::numops::NumOps; use items::scalarevents::ScalarEvents; use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; use netpod::log::*; -use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; +use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; @@ -65,7 +65,7 @@ where series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsBinaryQuery::new(channel, range, 1024 * 4); + let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); @@ -276,7 +276,7 @@ pub async fn get_plain_events_json( series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsJsonQuery::new(channel, range, 1024 * 4, None, false); + let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index d93b2e4..6d6c810 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -2,7 +2,10 @@ name = "dbconn" version = "0.0.1-a.0" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" + +[lib] +path = "src/dbconn.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/dbconn/src/lib.rs b/dbconn/src/dbconn.rs similarity index 100% rename from dbconn/src/lib.rs rename to dbconn/src/dbconn.rs diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 6ec0036..26f6364 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -1,9 +1,9 @@ -#![allow(unused_imports)] +use crate::{create_connection, delay_io_medium, delay_io_short, ErrConv}; use async_channel::{bounded, Receiver}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{pin_mut, FutureExt, StreamExt}; +use futures_util::FutureExt; use netpod::log::*; use netpod::{Database, NodeConfigCached}; use parse::channelconfig::NErr; @@ -11,7 +11,6 @@ use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::future::Future; use std::io::ErrorKind; -use std::marker::PhantomPinned; use std::os::unix::ffi::OsStringExt; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -20,8 +19,6 @@ use std::task::{Context, Poll}; use tokio::fs::{DirEntry, ReadDir}; use tokio_postgres::Client; -use crate::ErrConv; - #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { pub rowid: i64, @@ -214,7 +211,7 @@ impl UpdatedDbWithChannelNamesStream { channel_inp_done: false, clist: vec![], }; - ret.client_fut = Some(Box::pin(crate::create_connection( + ret.client_fut = Some(Box::pin(create_connection( &ret.node_config_ref.node_config.cluster.database, ))); Ok(ret) @@ -326,7 +323,7 @@ impl Stream for UpdatedDbWithChannelNamesStream { } async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: &Client) -> Result<(), Error> { - crate::delay_io_short().await; + delay_io_short().await; dbc.query("begin", &[]).await.err_conv()?; for ch in list { dbc.query( @@ -348,7 +345,7 @@ pub async fn update_db_with_channel_names( let tx2 = tx.clone(); let db_config = db_config.clone(); let block1 = async move { - let dbc = crate::create_connection(&db_config).await?; + let dbc = create_connection(&db_config).await?; let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; let c1 = Arc::new(RwLock::new(0u32)); dbc.query("begin", &[]).await.err_conv()?; @@ -386,7 +383,7 @@ pub async fn update_db_with_channel_names( count: c2, }; tx.send(Ok(ret)).await.err_conv()?; - crate::delay_io_medium().await; + delay_io_medium().await; dbc.query("begin", &[]).await.err_conv()?; } Ok(()) @@ -459,7 +456,7 @@ pub async fn update_db_with_all_channel_configs( let tx3 = tx.clone(); let block1 = async move { let node_config = &node_config; - let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; + let dbc = create_connection(&node_config.node_config.cluster.database).await?; let dbc = Arc::new(dbc); let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; let rows = dbc @@ -490,11 +487,11 @@ pub async fn update_db_with_all_channel_configs( { Err(e) => { error!("{:?}", e); - crate::delay_io_medium().await; + delay_io_medium().await; } Ok(UpdateChannelConfigResult::NotFound) => { warn!("can not find channel config {}", channel); - crate::delay_io_medium().await; + delay_io_medium().await; } Ok(UpdateChannelConfigResult::Done) => { c1 += 1; @@ -508,7 +505,7 @@ pub async fn update_db_with_all_channel_configs( tx.send(Ok(ret)).await.err_conv()?; dbc.query("begin", &[]).await.err_conv()?; } - crate::delay_io_short().await; + delay_io_short().await; } } } @@ -550,7 +547,7 @@ pub async fn update_db_with_all_channel_configs( } pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { - let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; + let dbc = create_connection(&node_config.node_config.cluster.database).await?; dbc.query("select update_cache()", &[]).await.err_conv()?; Ok(()) } @@ -655,7 +652,7 @@ pub async fn update_db_with_all_channel_datafiles( node_disk_ident: &NodeDiskIdent, ks_prefix: &str, ) -> Result<(), Error> { - let dbc = Arc::new(crate::create_connection(&node_config.node_config.cluster.database).await?); + let dbc = Arc::new(create_connection(&node_config.node_config.cluster.database).await?); let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 6bf5c5f..0229587 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -112,16 +112,12 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { debug!( "BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {range:?}" ); - // TODO let BinnedQuery provide the DiskIoTune. - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize; - let evq = RawEventsQuery { - channel: self.query.channel().clone(), - range: self.query.range().clone(), - agg_kind: self.query.agg_kind().clone(), - disk_io_tune, - do_decompress: true, - }; + // TODO let BinnedQuery provide the DiskIoTune and pass to RawEventsQuery: + let evq = RawEventsQuery::new( + self.query.channel().clone(), + self.query.range().clone(), + self.query.agg_kind().clone(), + ); let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new( @@ -369,16 +365,12 @@ impl ChannelExecFunction for BinnedJsonChannelExec { } Ok(None) => { debug!("BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {range:?}"); - // TODO let BinnedQuery provide the DiskIoTune. - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize; - let evq = RawEventsQuery { - channel: self.query.channel().clone(), - range: self.query.range().clone(), - agg_kind: self.query.agg_kind().clone(), - disk_io_tune, - do_decompress: true, - }; + // TODO let BinnedQuery provide the DiskIoTune and pass to RawEventsQuery: + let evq = RawEventsQuery::new( + self.query.channel().clone(), + self.query.range().clone(), + self.query.agg_kind().clone(), + ); let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s = TBinnerStream::<_, ::Output>::new( diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index e39816d..7fc3dfd 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -103,16 +103,12 @@ where Pin::Output as TimeBinnableType>::Output>> + Send>>, Error, > { - // TODO let PreBinnedQuery provide the tune: - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size(); - let evq = RawEventsQuery { - channel: self.query.channel().clone(), - range: self.query.patch().patch_range(), - agg_kind: self.query.agg_kind().clone(), - disk_io_tune, - do_decompress: true, - }; + // TODO let PreBinnedQuery provide the tune and pass to RawEventsQuery: + let evq = RawEventsQuery::new( + self.query.channel().clone(), + self.query.patch().patch_range(), + self.query.agg_kind().clone(), + ); if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { let msg = format!( "Patch length inconsistency {} {}", @@ -219,6 +215,20 @@ where } Ok(()) } + + fn poll_open_check_local_file( + self: &mut Self, + _fut: Pin> + Send>>, + ) -> ( + Poll::Output as TimeBinnableType>::Output>>>, + Pin> + Send>>, + ) { + todo!() + } + + fn _check_for_existing_cached_data(&mut self) -> Result<(), Error> { + todo!() + } } impl Stream for PreBinnedValueStream @@ -367,6 +377,10 @@ where } Pending => Pending, } + } else if let Some(fut) = self.open_check_local_file.take() { + let (res, fut) = Self::poll_open_check_local_file(&mut self, fut); + self.open_check_local_file = Some(fut); + res } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { Ready(item) => { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 357725b..a9f1594 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -77,6 +77,8 @@ impl AsyncRead for HttpBodyAsAsyncRead { } } +// For file-based caching, this determined the node where the cache file is located. +// No longer needed for scylla-based caching. pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); @@ -158,6 +160,7 @@ pub struct WrittenPbCache { pub duration: Duration, } +// TODO only used for old archiver pub async fn write_pb_cache_min_max_avg_scalar( values: T, patch: PreBinnedPatchCoord, diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index e3e4ddc..03773b1 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -3,6 +3,7 @@ use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; +use crate::events::PlainEventsQuery; use crate::merge::mergedfromremotes::MergedFromRemotes; use bytes::Bytes; use err::Error; @@ -18,9 +19,7 @@ use items::{ }; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{ - AggKind, ByteOrder, Channel, ChannelConfigQuery, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, -}; +use netpod::{AggKind, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; use std::fmt::Debug; @@ -195,8 +194,8 @@ where pub async fn channel_exec( f: F, - channel: &Channel, - range: &NanoRange, + _channel: &Channel, + _range: &NanoRange, scalar_type: ScalarType, shape: Shape, agg_kind: AggKind, @@ -221,17 +220,15 @@ pub struct PlainEvents { channel: Channel, range: NanoRange, agg_kind: AggKind, - disk_io_buffer_size: usize, node_config: NodeConfigCached, } impl PlainEvents { - pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize, node_config: NodeConfigCached) -> Self { + pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { Self { channel, range, agg_kind: AggKind::Plain, - disk_io_buffer_size, node_config, } } @@ -265,16 +262,8 @@ impl ChannelExecFunction for PlainEvents { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let upstream provide DiskIoTune - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.disk_io_buffer_size; - let evq = RawEventsQuery { - channel: self.channel, - range: self.range, - agg_kind: self.agg_kind, - disk_io_tune, - do_decompress: true, - }; + // TODO let upstream provide DiskIoTune and pass in RawEventsQuery: + let evq = RawEventsQuery::new(self.channel, self.range, self.agg_kind); let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); let s = s.map(|item| Box::new(item) as Box); Ok(Box::pin(s)) @@ -286,10 +275,10 @@ impl ChannelExecFunction for PlainEvents { } pub struct PlainEventsJson { + query: PlainEventsQuery, channel: Channel, range: NanoRange, agg_kind: AggKind, - disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, events_max: u64, @@ -298,19 +287,19 @@ pub struct PlainEventsJson { impl PlainEventsJson { pub fn new( + query: PlainEventsQuery, channel: Channel, range: NanoRange, - disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, events_max: u64, do_log: bool, ) -> Self { Self { + query, channel, range, agg_kind: AggKind::Plain, - disk_io_buffer_size, timeout, node_config, events_max, @@ -373,6 +362,7 @@ where } } StreamItem::Stats(item) => match item { + // TODO factor and simplify the stats collection: items::StatsItem::EventDataReadStats(_) => {} items::StatsItem::RangeFilterStats(_) => {} items::StatsItem::DiskStats(item) => match item { @@ -422,10 +412,10 @@ impl ChannelExecFunction for PlainEventsJson { fn exec( self, - byte_order: END, + _byte_order: END, _scalar_type: ScalarType, _shape: Shape, - event_value_shape: EVS, + _event_value_shape: EVS, _events_node_proc: ENP, ) -> Result where @@ -443,19 +433,11 @@ impl ChannelExecFunction for PlainEventsJson { Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + Framable + DeserializeOwned, { - let _ = byte_order; - let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let upstream provide DiskIoTune - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.disk_io_buffer_size; - let evq = RawEventsQuery { - channel: self.channel, - range: self.range, - agg_kind: self.agg_kind, - disk_io_tune, - do_decompress: true, - }; + // TODO let upstream provide DiskIoTune and set in RawEventsQuery. + let mut evq = RawEventsQuery::new(self.channel, self.range, self.agg_kind); + evq.do_test_main_error = self.query.do_test_main_error(); + evq.do_test_stream_error = self.query.do_test_stream_error(); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); let f = collect_plain_events_json(s, self.timeout, 0, self.events_max, self.do_log); let f = FutureExt::map(f, |item| match item { diff --git a/disk/src/events.rs b/disk/src/events.rs index 79e8446..2fd493a 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -7,101 +7,7 @@ use url::Url; // TODO move this query type out of this `binned` mod #[derive(Clone, Debug)] -pub struct PlainEventsBinaryQuery { - channel: Channel, - range: NanoRange, - disk_io_buffer_size: usize, - report_error: bool, - timeout: Duration, -} - -impl PlainEventsBinaryQuery { - pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize) -> Self { - Self { - channel, - range, - disk_io_buffer_size, - report_error: false, - timeout: Duration::from_millis(10000), - } - } - - pub fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; - let ret = Self { - channel: channel_from_pairs(&pairs)?, - range: NanoRange { - beg: beg_date.parse::>()?.to_nanos(), - end: end_date.parse::>()?.to_nanos(), - }, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: pairs - .get("timeout") - .map_or("10000", |k| k) - .parse::() - .map(|k| Duration::from_millis(k)) - .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, - }; - Ok(ret) - } - - pub fn range(&self) -> &NanoRange { - &self.range - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size - } - - pub fn report_error(&self) -> bool { - self.report_error - } - - pub fn timeout(&self) -> Duration { - self.timeout - } - - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = k; - } -} - -impl AppendToUrl for PlainEventsBinaryQuery { - fn append_to_url(&self, url: &mut Url) { - let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - channel_append_to_url(url, &self.channel); - let mut g = url.query_pairs_mut(); - g.append_pair( - "begDate", - &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), - ); - g.append_pair( - "endDate", - &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), - ); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); - g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); - } -} - -// TODO move this query type out of this `binned` mod -#[derive(Clone, Debug)] -pub struct PlainEventsJsonQuery { +pub struct PlainEventsQuery { channel: Channel, range: NanoRange, disk_io_buffer_size: usize, @@ -109,9 +15,11 @@ pub struct PlainEventsJsonQuery { timeout: Duration, events_max: Option, do_log: bool, + do_test_main_error: bool, + do_test_stream_error: bool, } -impl PlainEventsJsonQuery { +impl PlainEventsQuery { pub fn new( channel: Channel, range: NanoRange, @@ -127,6 +35,8 @@ impl PlainEventsJsonQuery { timeout: Duration::from_millis(10000), events_max, do_log, + do_test_main_error: false, + do_test_stream_error: false, } } @@ -164,6 +74,16 @@ impl PlainEventsJsonQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_public_msg(format!("can not parse doLog {:?}", e)))?, + do_test_main_error: pairs + .get("doTestMainError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError {:?}", e)))?, + do_test_stream_error: pairs + .get("doTestStreamError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, }; Ok(ret) } @@ -202,10 +122,26 @@ impl PlainEventsJsonQuery { self.do_log } + pub fn do_test_main_error(&self) -> bool { + self.do_test_main_error + } + + pub fn do_test_stream_error(&self) -> bool { + self.do_test_stream_error + } + pub fn set_timeout(&mut self, k: Duration) { self.timeout = k; } + pub fn set_do_test_main_error(&mut self, k: bool) { + self.do_test_main_error = k; + } + + pub fn set_do_test_stream_error(&mut self, k: bool) { + self.do_test_stream_error = k; + } + pub fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; channel_append_to_url(url, &self.channel); @@ -227,25 +163,25 @@ impl PlainEventsJsonQuery { } } -impl HasBackend for PlainEventsJsonQuery { +impl HasBackend for PlainEventsQuery { fn backend(&self) -> &str { &self.channel.backend } } -impl HasTimeout for PlainEventsJsonQuery { +impl HasTimeout for PlainEventsQuery { fn timeout(&self) -> Duration { self.timeout.clone() } } -impl FromUrl for PlainEventsJsonQuery { +impl FromUrl for PlainEventsQuery { fn from_url(url: &Url) -> Result { Self::from_url(url) } } -impl AppendToUrl for PlainEventsJsonQuery { +impl AppendToUrl for PlainEventsQuery { fn append_to_url(&self, url: &mut Url) { self.append_to_url(url) } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 6fcf968..0db0dd6 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -158,11 +158,14 @@ where let frame_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]); let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]); + //info!("len {}", len); + //info!("payload_crc_ind {}", payload_crc_ind); + //info!("frame_crc_ind {}", frame_crc_ind); let payload_crc_match = payload_crc_ind == payload_crc; let frame_crc_match = frame_crc_ind == frame_crc; - if !payload_crc_match || !frame_crc_match { + if !frame_crc_match || !payload_crc_match { let ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); - warn!("CRC mismatch A\n{ss:?}"); + warn!("CRC mismatch A frame_crc_match {frame_crc_match} payload_crc_match {payload_crc_match}\n{ss:?}"); return ( Some(Some(Err(Error::with_msg(format!( "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", diff --git a/dq/src/dq.rs b/dq/src/dq.rs index b0f5daa..3036a88 100644 --- a/dq/src/dq.rs +++ b/dq/src/dq.rs @@ -375,16 +375,14 @@ pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { channel.name() ))); } - let evq = RawEventsQuery { - channel: channel.clone(), - range: NanoRange { + let evq = RawEventsQuery::new( + channel.clone(), + NanoRange { beg: u64::MIN, end: u64::MAX, }, - agg_kind: AggKind::Plain, - disk_io_tune: netpod::DiskIoTune::default(), - do_decompress: true, - }; + AggKind::Plain, + ); let f1 = pbr.into_file(); // TODO can the positioning-logic maybe re-use the pbr? let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?; diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index eca3c17..5d030d8 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -534,6 +534,7 @@ pub struct DataApiPython3DataStream { chan_stream: Option> + Send>>>, config_fut: Option> + Send>>>, disk_io_tune: DiskIoTune, + #[allow(unused)] do_decompress: bool, #[allow(unused)] event_count: u64, @@ -730,13 +731,7 @@ impl Stream for DataApiPython3DataStream { }; let channel = self.channels[self.chan_ix - 1].clone(); debug!("found channel_config for {}: {:?}", channel.name, entry); - let evq = RawEventsQuery { - channel, - range: self.range.clone(), - agg_kind: netpod::AggKind::EventBlobs, - disk_io_tune: self.disk_io_tune.clone(), - do_decompress: self.do_decompress, - }; + let evq = RawEventsQuery::new(channel, self.range.clone(), netpod::AggKind::EventBlobs); let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { diff --git a/httpret/src/bodystream.rs b/httpret/src/bodystream.rs new file mode 100644 index 0000000..8f7d155 --- /dev/null +++ b/httpret/src/bodystream.rs @@ -0,0 +1,135 @@ +use crate::err::Error; +use bytes::Bytes; +use futures_core::Stream; +use futures_util::StreamExt; +use http::HeaderMap; +use http::{Response, StatusCode}; +use hyper::Body; +use netpod::log::*; +use netpod::APP_JSON; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tracing::field::Empty; +use tracing::{span, Level}; + +fn proxy_mark() -> &'static str { + "7c5e408a" +} + +pub fn response(status: T) -> http::response::Builder +where + http::StatusCode: std::convert::TryFrom, + >::Error: Into, +{ + Response::builder() + .status(status) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Headers", "*") + .header("x-proxy-log-mark", proxy_mark()) +} + +pub struct BodyStream { + inp: S, + desc: String, +} + +impl BodyStream +where + S: Stream> + Unpin + Send + 'static, + I: Into + Sized + 'static, +{ + pub fn new(inp: S, desc: String) -> Self { + Self { inp, desc } + } + + pub fn wrapped(inp: S, desc: String) -> Body { + Body::wrap_stream(Self::new(inp, desc)) + } +} + +impl Stream for BodyStream +where + S: Stream> + Unpin, + I: Into + Sized, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let span1 = span!(Level::INFO, "httpret::BodyStream", desc = Empty); + span1.record("desc", &self.desc.as_str()); + span1.in_scope(|| { + use Poll::*; + let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx))); + match t { + Ok(r) => match r { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + error!("body stream error: {e:?}"); + Ready(Some(Err(Error::from(e)))) + } + Ready(None) => Ready(None), + Pending => Pending, + }, + Err(e) => { + error!("panic caught in httpret::BodyStream: {e:?}"); + let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {e:?}")); + Ready(Some(Err(e))) + } + } + }) + } +} + +pub trait ToPublicResponse { + fn to_public_response(&self) -> Response; +} + +impl ToPublicResponse for Error { + fn to_public_response(&self) -> Response { + self.0.to_public_response() + } +} + +impl ToPublicResponse for ::err::Error { + fn to_public_response(&self) -> Response { + use err::Reason; + let e = self.to_public_error(); + let status = match e.reason() { + Some(Reason::BadRequest) => StatusCode::BAD_REQUEST, + Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + let msg = match serde_json::to_string(&e) { + Ok(s) => s, + Err(_) => "can not serialize error".into(), + }; + match response(status) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::from(msg)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e:?}"); + let mut res = Response::new(Body::default()); + *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + res + } + } + } +} + +struct BodyStreamWrap(netpod::BodyStream); + +impl hyper::body::HttpBody for BodyStreamWrap { + type Data = bytes::Bytes; + type Error = ::err::Error; + + fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + self.0.inner.poll_next_unpin(cx) + } + + fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 9abc71f..9ad9267 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -2,7 +2,7 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; use dbconn::create_connection; use disk::binned::query::PreBinnedQuery; -use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; +use disk::events::PlainEventsQuery; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; @@ -24,11 +24,11 @@ pub struct ChConf { pub shape: Shape, } -pub async fn chconf_from_events_binary(_q: &PlainEventsBinaryQuery, _conf: &NodeConfigCached) -> Result { +pub async fn chconf_from_events_binary(_q: &PlainEventsQuery, _conf: &NodeConfigCached) -> Result { err::todoval() } -pub async fn chconf_from_events_json(q: &PlainEventsJsonQuery, ncc: &NodeConfigCached) -> Result { +pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { if q.channel().backend != ncc.node_config.cluster.backend { warn!( "Mismatched backend {} VS {}", @@ -843,36 +843,37 @@ impl ChannelFromSeries { #[derive(Clone, Debug, Deserialize)] pub struct IocForChannelQuery { - channel: String, + facility: String, + #[serde(rename = "channelName")] + channel_name: String, } impl FromUrl for IocForChannelQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); - let channel = pairs + let facility = pairs + .get("facility") + .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? + .into(); + let channel_name = pairs .get("channelName") .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? .into(); - Ok(Self { channel }) + Ok(Self { facility, channel_name }) } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ChannelIoc { - channel: String, - ioc: String, -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct IocForChannelRes { - channels: Vec, + #[serde(rename = "iocAddr")] + ioc_addr: String, } pub struct IocForChannel {} impl IocForChannel { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/ioc/channel" { + if req.uri().path() == "/api/4/channel/ioc" { Some(Self {}) } else { None @@ -889,9 +890,16 @@ impl IocForChannel { if accept == APP_JSON || accept == ACCEPT_ALL { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = IocForChannelQuery::from_url(&url)?; - let res = self.find(&q, node_config).await?; - let body = Body::from(serde_json::to_vec(&res)?); - Ok(response(StatusCode::OK).body(body)?) + match self.find(&q, node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => { + let body = Body::from(format!("{:?}", e.public_msg())); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body)?) + } + } } else { Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) } @@ -900,13 +908,26 @@ impl IocForChannel { } } - async fn find(&self, q: &IocForChannelQuery, node_config: &NodeConfigCached) -> Result { - // TODO implement lookup in postgres - let _ = q; - let _pgconn = create_connection(&node_config.node_config.cluster.database).await?; - let _facility = "scylla"; - let ret = IocForChannelRes { channels: vec![] }; - Ok(ret) + async fn find( + &self, + q: &IocForChannelQuery, + node_config: &NodeConfigCached, + ) -> Result, Error> { + let dbconf = &node_config.node_config.cluster.database; + let pg_client = create_connection(dbconf).await?; + let rows = pg_client + .query( + "select addr from ioc_by_channel where facility = $1 and channel = $2", + &[&q.facility, &q.channel_name], + ) + .await?; + if let Some(row) = rows.first() { + let ioc_addr = row.get(0); + let ret = IocForChannelRes { ioc_addr }; + Ok(Some(ret)) + } else { + Ok(None) + } } } diff --git a/httpret/src/events.rs b/httpret/src/events.rs index c7cce38..197e9d2 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,7 +1,7 @@ use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; use crate::{response, response_err, BodyStream, ToPublicResponse}; -use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; +use disk::events::PlainEventsQuery; use futures_util::{StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; @@ -52,14 +52,9 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { debug!("httpret plain_events_binary req: {:?}", req); let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let query = PlainEventsBinaryQuery::from_url(&url)?; + let query = PlainEventsQuery::from_url(&url)?; let chconf = chconf_from_events_binary(&query, node_config).await?; - let op = disk::channelexec::PlainEvents::new( - query.channel().clone(), - query.range().clone(), - query.disk_io_buffer_size(), - node_config.clone(), - ); + let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); let s = disk::channelexec::channel_exec( op, query.channel(), @@ -81,12 +76,13 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); - let query = PlainEventsJsonQuery::from_request_head(&head)?; + let query = PlainEventsQuery::from_request_head(&head)?; let chconf = chconf_from_events_json(&query, node_config).await?; let op = disk::channelexec::PlainEventsJson::new( + // TODO pass only the query, not channel, range again: + query.clone(), query.channel().clone(), query.range().clone(), - query.disk_io_buffer_size(), query.timeout(), node_config.clone(), query.events_max().unwrap_or(u64::MAX), diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index f335366..aed0359 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -9,7 +9,7 @@ use disk::decode::Endianness; use disk::decode::EventValueFromBytes; use disk::decode::EventValueShape; use disk::decode::NumFromBytes; -use disk::events::PlainEventsJsonQuery; +use disk::events::PlainEventsQuery; use disk::merge::mergedfromremotes::MergedFromRemotes; use futures_util::FutureExt; use futures_util::Stream; @@ -58,7 +58,7 @@ impl EventInfoScan { } let (head, _body) = req.into_parts(); let url = Url::parse(&format!("dummy:{}", head.uri))?; - let query = PlainEventsJsonQuery::from_url(&url)?; + let query = PlainEventsQuery::from_url(&url)?; let ret = match Self::exec(&query, node_config).await { Ok(stream) => { // @@ -71,7 +71,7 @@ impl EventInfoScan { } pub async fn exec( - query: &PlainEventsJsonQuery, + query: &PlainEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { let chconf = chconf_from_events_json(&query, node_config).await?; @@ -95,14 +95,14 @@ impl EventInfoScan { } pub struct EvInfoFunc { - query: PlainEventsJsonQuery, + query: PlainEventsQuery, timeout: Duration, node_config: NodeConfigCached, events_max: u64, } impl EvInfoFunc { - pub fn new(query: PlainEventsJsonQuery, timeout: Duration, events_max: u64, node_config: NodeConfigCached) -> Self { + pub fn new(query: PlainEventsQuery, timeout: Duration, events_max: u64, node_config: NodeConfigCached) -> Self { Self { query, timeout, @@ -149,16 +149,8 @@ impl ChannelExecFunction for EvInfoFunc { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; - // TODO let PlainEventsJsonQuery provide the tune - let mut disk_io_tune = netpod::DiskIoTune::default(); - disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size(); - let evq = RawEventsQuery { - channel: self.query.channel().clone(), - range: self.query.range().clone(), - agg_kind: AggKind::Plain, - disk_io_tune, - do_decompress: true, - }; + // TODO let PlainEventsJsonQuery provide the tune and pass to RawEventsQuery: + let evq = RawEventsQuery::new(self.query.channel().clone(), self.query.range().clone(), AggKind::Plain); // TODO Use a Merged-From-Multiple-Local-Splits. // TODO Pass the read buffer size from query parameter: GPFS needs a larger buffer.. diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index e2806f2..4bcf022 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,4 +1,5 @@ pub mod api1; +pub mod bodystream; pub mod channelarchiver; pub mod channelconfig; pub mod download; @@ -11,16 +12,16 @@ pub mod pulsemap; pub mod search; pub mod settings; +use self::bodystream::{BodyStream, ToPublicResponse}; +use crate::bodystream::response; use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; -use bytes::Bytes; use channelconfig::{chconf_from_binned, ChConf}; use disk::binned::query::PreBinnedQuery; use future::Future; -use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryStreamExt}; -use http::{HeaderMap, Method, StatusCode}; +use http::{Method, StatusCode}; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; @@ -41,14 +42,9 @@ use std::sync::{Once, RwLock, RwLockWriteGuard}; use std::time::SystemTime; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; -use tracing::field::Empty; use tracing::Instrument; use url::Url; -fn proxy_mark() -> &'static str { - "7c5e408a" -} - pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { @@ -390,123 +386,6 @@ pub fn api_1_docs(path: &str) -> Result, Error> { Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) } -fn response(status: T) -> http::response::Builder -where - http::StatusCode: std::convert::TryFrom, - >::Error: Into, -{ - Response::builder() - .status(status) - .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Headers", "*") - .header("x-proxy-log-mark", proxy_mark()) -} - -struct BodyStreamWrap(netpod::BodyStream); - -impl hyper::body::HttpBody for BodyStreamWrap { - type Data = bytes::Bytes; - type Error = ::err::Error; - - fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - self.0.inner.poll_next_unpin(cx) - } - - fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } -} - -struct BodyStream { - inp: S, - desc: String, -} - -impl BodyStream -where - S: Stream> + Unpin + Send + 'static, - I: Into + Sized + 'static, -{ - pub fn new(inp: S, desc: String) -> Self { - Self { inp, desc } - } - - pub fn wrapped(inp: S, desc: String) -> Body { - Body::wrap_stream(Self::new(inp, desc)) - } -} - -impl Stream for BodyStream -where - S: Stream> + Unpin, - I: Into + Sized, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let span1 = span!(Level::INFO, "httpret::BodyStream", desc = Empty); - span1.record("desc", &self.desc.as_str()); - span1.in_scope(|| { - use Poll::*; - let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx))); - match t { - Ok(r) => match r { - Ready(Some(Ok(k))) => Ready(Some(Ok(k))), - Ready(Some(Err(e))) => { - error!("body stream error: {e:?}"); - Ready(Some(Err(Error::from(e)))) - } - Ready(None) => Ready(None), - Pending => Pending, - }, - Err(e) => { - error!("panic caught in httpret::BodyStream: {e:?}"); - let e = Error::with_msg(format!("panic caught in httpret::BodyStream: {e:?}")); - Ready(Some(Err(e))) - } - } - }) - } -} - -trait ToPublicResponse { - fn to_public_response(&self) -> Response; -} - -impl ToPublicResponse for Error { - fn to_public_response(&self) -> Response { - self.0.to_public_response() - } -} - -impl ToPublicResponse for ::err::Error { - fn to_public_response(&self) -> Response { - use ::err::Reason; - let e = self.to_public_error(); - let status = match e.reason() { - Some(Reason::BadRequest) => StatusCode::BAD_REQUEST, - Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }; - let msg = match serde_json::to_string(&e) { - Ok(s) => s, - Err(_) => "can not serialize error".into(), - }; - match response(status) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::from(msg)) - { - Ok(res) => res, - Err(e) => { - error!("can not generate http error response {e:?}"); - let mut res = Response::new(Body::default()); - *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - res - } - } - } -} - pub struct StatusBoardAllHandler {} impl StatusBoardAllHandler { @@ -530,7 +409,10 @@ impl StatusBoardAllHandler { async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { match binned_inner(req, node_config).await { Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), + Err(e) => { + error!("fn binned: {e:?}"); + Ok(e.to_public_response()) + } } } @@ -556,20 +438,13 @@ async fn binned_binary( chconf: ChConf, node_config: &NodeConfigCached, ) -> Result, Error> { - let ret = match disk::binned::binned_bytes_for_http(&query, chconf.scalar_type, chconf.shape, node_config).await { - Ok(s) => { - response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_binary")))? - } - Err(e) => { - if query.report_error() { - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? - } else { - error!("fn binned_binary: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? - } - } - }; - Ok(ret) + let body_stream = + disk::binned::binned_bytes_for_http(&query, chconf.scalar_type, chconf.shape, node_config).await?; + let res = response(StatusCode::OK).body(BodyStream::wrapped( + body_stream.map_err(Error::from), + format!("binned_binary"), + ))?; + Ok(res) } async fn binned_json( @@ -577,18 +452,12 @@ async fn binned_json( chconf: ChConf, node_config: &NodeConfigCached, ) -> Result, Error> { - let ret = match disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await { - Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_json")))?, - Err(e) => { - if query.report_error() { - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))? - } else { - error!("fn binned_json: {:?}", e); - response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? - } - } - }; - Ok(ret) + let body_stream = disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await?; + let res = response(StatusCode::OK).body(BodyStream::wrapped( + body_stream.map_err(Error::from), + format!("binned_json"), + ))?; + Ok(res) } async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 283058b..37bf848 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -5,7 +5,7 @@ use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; use crate::{api_1_docs, api_4_docs, response, response_err, Cont}; -use disk::events::PlainEventsJsonQuery; +use disk::events::PlainEventsQuery; use futures_core::Stream; use futures_util::pin_mut; use http::{Method, StatusCode}; @@ -108,7 +108,7 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path == "/api/4/search/channel" { Ok(api4::channel_search(req, proxy_config).await?) } else if path == "/api/4/events" { - Ok(proxy_single_backend_query::(req, proxy_config).await?) + Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse/") { Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path == "/api/4/binned" { diff --git a/items/src/frame.rs b/items/src/frame.rs index c60781d..72ddbd6 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -4,12 +4,15 @@ use crate::{ }; use bytes::{BufMut, BytesMut}; use err::Error; +#[allow(unused)] +use netpod::log::*; use serde::{de::DeserializeOwned, Serialize}; pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, { + //trace!("make_frame"); if item.is_err() { make_error_frame(item.err().unwrap()) } else { @@ -21,6 +24,7 @@ pub fn make_frame_2(item: &FT, fty: u32) -> Result where FT: Serialize, { + //trace!("make_frame_2"); match bincode::serialize(item) { Ok(enc) => { if enc.len() > u32::MAX as usize { @@ -35,11 +39,15 @@ where buf.put_u32_le(fty); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); + // TODO add padding to align to 8 bytes. + //trace!("enc len {}", enc.len()); + //trace!("payload_crc {}", payload_crc); buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); let frame_crc = h.finalize(); buf.put_u32_le(frame_crc); + //trace!("frame_crc {}", frame_crc); Ok(buf) } Err(e) => Err(e)?, @@ -47,10 +55,11 @@ where } pub fn make_error_frame(error: &::err::Error) -> Result { + //trace!("make_error_frame"); match bincode::serialize(error) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); - h.update(&[]); + h.update(&enc); let payload_crc = h.finalize(); let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); @@ -58,11 +67,15 @@ pub fn make_error_frame(error: &::err::Error) -> Result { buf.put_u32_le(ERROR_FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); + // TODO add padding to align to 8 bytes. + //trace!("enc len {}", enc.len()); + //trace!("payload_crc {}", payload_crc); buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); let frame_crc = h.finalize(); buf.put_u32_le(frame_crc); + //trace!("frame_crc {}", frame_crc); Ok(buf) } Err(e) => Err(e)?, @@ -70,15 +83,19 @@ pub fn make_error_frame(error: &::err::Error) -> Result { } pub fn make_term_frame() -> BytesMut { + //trace!("make_term_frame"); + let enc = []; let mut h = crc32fast::Hasher::new(); - h.update(&[]); + h.update(&enc); let payload_crc = h.finalize(); let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(TERM_FRAME_TYPE_ID); - buf.put_u32_le(0); + buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); + // TODO add padding to align to 8 bytes. + buf.put(enc.as_ref()); let mut h = crc32fast::Hasher::new(); h.update(&buf); let frame_crc = h.finalize(); diff --git a/items/src/lib.rs b/items/src/lib.rs index 546aefe..f7c4410 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -20,6 +20,8 @@ use bytes::BytesMut; use chrono::{TimeZone, Utc}; use err::Error; use frame::make_error_frame; +#[allow(unused)] +use netpod::log::*; use netpod::timeunits::{MS, SEC}; use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape}; use netpod::{DiskStats, RangeFilterStats}; @@ -282,6 +284,7 @@ where } fn make_frame(&self) -> Result { + //trace!("make_frame"); match self { Ok(_) => make_frame_2(self, T::FRAME_TYPE_ID), Err(e) => make_error_frame(e), diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 676d53d..a7655a7 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1711,6 +1711,12 @@ impl DiskIoTune { } } +impl Default for DiskIoTune { + fn default() -> Self { + Self::default() + } +} + pub fn channel_from_pairs(pairs: &BTreeMap) -> Result { let ret = Channel { backend: pairs diff --git a/netpod/src/query.rs b/netpod/src/query.rs index c22983b..0eaf46c 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -71,8 +71,28 @@ pub struct RawEventsQuery { pub channel: Channel, pub range: NanoRange, pub agg_kind: AggKind, + #[serde(default)] pub disk_io_tune: DiskIoTune, + #[serde(default)] pub do_decompress: bool, + #[serde(default)] + pub do_test_main_error: bool, + #[serde(default)] + pub do_test_stream_error: bool, +} + +impl RawEventsQuery { + pub fn new(channel: Channel, range: NanoRange, agg_kind: AggKind) -> Self { + Self { + channel, + range, + agg_kind, + disk_io_tune: DiskIoTune::default(), + do_decompress: false, + do_test_main_error: false, + do_test_stream_error: false, + } + } } #[derive(Clone, Debug)] diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 7236510..b4463f5 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -31,7 +31,6 @@ pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> } async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { - //use tracing_futures::Instrument; let span1 = span!(Level::INFO, "events_conn_handler"); let r = events_conn_handler_inner(stream, addr, &node_config) .instrument(span1) @@ -39,7 +38,7 @@ async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: N match r { Ok(k) => Ok(k), Err(e) => { - error!("raw_conn_handler sees error: {:?}", e); + error!("events_conn_handler sees error: {:?}", e); Err(e) } } @@ -53,8 +52,14 @@ async fn events_conn_handler_inner( match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(ce) => { - // TODO pass errors over network. - error!("events_conn_handler_inner: {:?}", ce.err); + // Try to pass the error over the network. + // If that fails, give error to the caller. + let mut out = ce.netout; + let e = ce.err; + let buf = items::frame::make_error_frame(&e)?; + //type T = StreamItem>>; + //let buf = Err::(e).make_frame()?; + out.write_all(&buf).await?; } } Ok(()) @@ -118,11 +123,17 @@ async fn events_conn_handler_inner_try( }; info!("events_conn_handler_inner_try evq {:?}", evq); + if evq.do_test_main_error { + let e = Error::with_msg(format!("Test error private message.")) + .add_public_msg(format!("Test error PUBLIC message.")); + return Err((e, netout).into()); + } + let mut p1: Pin> + Send>> = if let Some(conf) = &node_config.node_config.cluster.scylla { let scyco = conf; let dbconf = node_config.node_config.cluster.database.clone(); - match make_scylla_stream(&evq, scyco, dbconf).await { + match make_scylla_stream(&evq, scyco, dbconf, evq.do_test_stream_error).await { Ok(j) => j, Err(e) => return Err((e, netout))?, } diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 17f8c31..83fecc3 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -169,10 +169,16 @@ pub struct ScyllaFramableStream { shape: Option, scy: Arc, pgclient: Arc, + do_test_stream_error: bool, } impl ScyllaFramableStream { - pub fn new(evq: &RawEventsQuery, scy: Arc, pgclient: Arc) -> Self { + pub fn new( + evq: &RawEventsQuery, + scy: Arc, + pgclient: Arc, + do_test_stream_error: bool, + ) -> Self { Self { state: FrState::New, series: evq.channel.series.unwrap(), @@ -183,6 +189,7 @@ impl ScyllaFramableStream { shape: None, scy, pgclient, + do_test_stream_error, } } } @@ -192,6 +199,13 @@ impl Stream for ScyllaFramableStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.do_test_stream_error { + let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) + .add_public_msg(format!("Test PUBLIC STREAM error.")); + return Ready(Some( + Box::new(Err::>>, _>(e)) as _, + )); + } loop { break match self.state { FrState::New => { @@ -296,7 +310,7 @@ async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType } async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { - info!("find_ts_msp series {} {:?}", series, range); + trace!("find_ts_msp series {} {:?}", series, range); // TODO use prepared statements let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1"; let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; @@ -305,7 +319,7 @@ async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Res let row = row.err_conv()?; before.push(row.0 as u64); } - info!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); + trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; let res = scy .query(cql, (series, range.beg as i64, range.end as i64)) @@ -319,7 +333,7 @@ async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Res let row = row.err_conv()?; ret.push(row.0 as u64); } - info!("found in total {} rows {:?}", ret.len(), ret); + trace!("found in total {} rows", ret.len()); Ok(ret) } @@ -333,7 +347,7 @@ macro_rules! read_next_scalar_values { ) -> Result, Error> { type ST = $st; type SCYTY = $scyty; - info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); let _ts_lsp_max = if range.end <= ts_msp { // TODO we should not be here... } else { @@ -364,7 +378,7 @@ macro_rules! read_next_scalar_values { ret.push(ts, pulse, value); } } - info!( + trace!( "found in total {} events ts_msp {} discarded {}", ret.tss.len(), ts_msp, @@ -416,6 +430,7 @@ pub async fn make_scylla_stream( evq: &RawEventsQuery, scyco: &ScyllaConfig, dbconf: Database, + do_test_stream_error: bool, ) -> Result> + Send>>, Error> { info!("make_scylla_stream open scylla connection"); // TODO reuse existing connection: @@ -434,7 +449,7 @@ pub async fn make_scylla_stream( tokio::spawn(pgconn); let pgclient = Arc::new(pgclient); let scy = Arc::new(scy); - let res = Box::pin(ScyllaFramableStream::new(evq, scy, pgclient)) as _; + let res = Box::pin(ScyllaFramableStream::new(evq, scy, pgclient, do_test_stream_error)) as _; Ok(res) }