diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 0f74e35..5caeb3b 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -257,6 +257,7 @@ macro_rules! arm1 { ($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{ match $shape { Shape::Scalar => match $ak { + AggKind::EventBlobs => panic!(), AggKind::Plain => arm2!( $item, EventValues, @@ -290,6 +291,7 @@ macro_rules! arm1 { ), }, Shape::Wave(_) => match $ak { + AggKind::EventBlobs => panic!(), AggKind::Plain => arm2!( $item, WaveEvents, diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 25b6f06..2bd4a08 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -142,6 +142,7 @@ fn _tmp1() { macro_rules! wagg1 { ($k:expr, $ak:expr, $shape:expr, $sty:ident) => { match $ak { + AggKind::EventBlobs => panic!(), AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$sty($k))), AggKind::TimeWeightedScalar => { let p = WaveXBinner::create($shape, $ak.clone()); @@ -260,6 +261,7 @@ impl MultiBinWaveEvents { use MultiBinWaveEvents::*; match self { Byte(k) => match ak { + AggKind::EventBlobs => panic!(), AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::Byte(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), @@ -347,6 +349,7 @@ impl SingleBinWaveEvents { use SingleBinWaveEvents::*; match self { Byte(k) => match ak { + AggKind::EventBlobs => panic!(), AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index f860e6f..8cbcf27 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -53,6 +53,7 @@ where Shape::Scalar => { let evs = EventValuesDim0Case::new(); match agg_kind { + AggKind::EventBlobs => panic!(), AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); make_num_pipeline_nty_end_evs_enp::( @@ -83,6 +84,7 @@ where Shape::Wave(n) => { let evs = EventValuesDim1Case::new(n); match agg_kind { + AggKind::EventBlobs => panic!(), AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); make_num_pipeline_nty_end_evs_enp::( diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 462682a..4f99060 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -382,6 +382,7 @@ impl AppendToUrl for BinnedQuery { fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { let mut g = url.query_pairs_mut(); match agg_kind { + AggKind::EventBlobs => panic!(), AggKind::TimeWeightedScalar => { g.append_pair("binningScheme", "timeWeightedScalar"); } @@ -403,7 +404,9 @@ fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result { // match agg_kind { + AggKind::EventBlobs => panic!(), AggKind::Plain => { let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); @@ -118,6 +119,7 @@ where Shape::Wave(n) => { // match agg_kind { + AggKind::EventBlobs => panic!(), AggKind::Plain => { let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 8e812bd..a12d483 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -113,20 +113,28 @@ async fn position_file( match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 80 { + if meta.len() > 1024 * 1024 * 120 { let msg = format!( "too large index file {} bytes for {}", meta.len(), channel_config.channel.name ); + error!("{}", msg); return Err(Error::with_msg(msg)); + } else if meta.len() > 1024 * 1024 * 80 { + let msg = format!( + "very large index file {} bytes for {}", + meta.len(), + channel_config.channel.name + ); + warn!("{}", msg); } else if meta.len() > 1024 * 1024 * 20 { let msg = format!( "large index file {} bytes for {}", meta.len(), channel_config.channel.name ); - warn!("{}", msg); + info!("{}", msg); } if meta.len() < 2 { return Err(Error::with_msg(format!( diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index f10e6ae..d88ac4d 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -4,10 +4,13 @@ use bytes::{Buf, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::{Appendable, PushableIndex, RangeCompletableItem, StatsItem, StreamItem, WithLen, WithTimestamps}; +use items::{ + Appendable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen, WithTimestamps, +}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::path::PathBuf; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -33,6 +36,7 @@ pub struct EventChunker { expand: bool, seen_before_range_count: usize, seen_after_range_count: usize, + unordered_warn_count: usize, } enum DataFileState { @@ -87,6 +91,7 @@ impl EventChunker { expand, seen_before_range_count: 0, seen_after_range_count: 0, + unordered_warn_count: 0, } } @@ -168,15 +173,20 @@ impl EventChunker { let pulse = sl.read_i64::().unwrap() as u64; let max_ts = self.max_ts.load(Ordering::SeqCst); if ts < max_ts { - Err(Error::with_msg(format!( - "unordered event ts: {}.{} max_ts {}.{} config {:?} path {:?}", - ts / SEC, - ts % SEC, - max_ts / SEC, - max_ts % SEC, - self.channel_config.shape, - self.path - )))?; + if self.unordered_warn_count < 20 { + let msg = format!( + "unordered event no {} ts: {}.{} max_ts {}.{} config {:?} path {:?}", + self.unordered_warn_count, + ts / SEC, + ts % SEC, + max_ts / SEC, + max_ts % SEC, + self.channel_config.shape, + self.path + ); + warn!("{}", msg); + self.unordered_warn_count += 1; + } } self.max_ts.store(ts, Ordering::SeqCst); if ts >= self.range.end { @@ -242,6 +252,17 @@ impl EventChunker { for i1 in 0..shape_dim { shape_lens[i1 as usize] = sl.read_u32::().unwrap(); } + let shape_this = { + if is_shaped { + if shape_dim == 1 { + Shape::Wave(shape_lens[0]) + } else { + err::todoval() + } + } else { + Shape::Scalar + } + }; if is_compressed { //debug!("event ts {} is_compressed {}", ts, is_compressed); let value_bytes = sl.read_u64::().unwrap(); @@ -291,6 +312,7 @@ impl EventChunker { Some(decomp), ScalarType::from_dtype_index(type_index)?, is_big_endian, + shape_this, ); } Err(e) => { @@ -311,6 +333,7 @@ impl EventChunker { Some(decomp), ScalarType::from_dtype_index(type_index)?, is_big_endian, + shape_this, ); } buf.advance(len as usize); @@ -331,13 +354,48 @@ impl EventChunker { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct EventFull { pub tss: Vec, pub pulses: Vec, + #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] pub decomps: Vec>, pub scalar_types: Vec, pub be: Vec, + pub shapes: Vec, +} + +fn decomps_ser(t: &Vec>, s: S) -> Result +where + S: Serializer, +{ + let a: Vec<_> = t + .iter() + .map(|k| match k { + None => None, + Some(j) => Some(j[..].to_vec()), + }) + .collect(); + Serialize::serialize(&a, s) +} + +fn decomps_de<'de, D>(d: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let a: Vec>> = Deserialize::deserialize(d)?; + let a = a + .iter() + .map(|k| match k { + None => None, + Some(j) => { + let mut a = BytesMut::new(); + a.extend_from_slice(&j); + Some(a) + } + }) + .collect(); + Ok(a) } impl EventFull { @@ -348,18 +406,32 @@ impl EventFull { decomps: vec![], scalar_types: vec![], be: vec![], + shapes: vec![], } } - fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType, be: bool) { + fn add_event( + &mut self, + ts: u64, + pulse: u64, + decomp: Option, + scalar_type: ScalarType, + be: bool, + shape: Shape, + ) { self.tss.push(ts); self.pulses.push(pulse); self.decomps.push(decomp); self.scalar_types.push(scalar_type); self.be.push(be); + self.shapes.push(shape); } } +impl SitemtyFrameType for EventFull { + const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID; +} + impl WithLen for EventFull { fn len(&self) -> usize { self.tss.len() @@ -378,6 +450,7 @@ impl Appendable for EventFull { self.decomps.extend_from_slice(&src.decomps); self.scalar_types.extend_from_slice(&src.scalar_types); self.be.extend_from_slice(&src.be); + self.shapes.extend_from_slice(&src.shapes); } } @@ -395,6 +468,7 @@ impl PushableIndex for EventFull { self.decomps.push(src.decomps[ix].clone()); self.scalar_types.push(src.scalar_types[ix].clone()); self.be.push(src.be[ix]); + self.shapes.push(src.shapes[ix].clone()); } } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 4eb63f8..51345e4 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -11,6 +11,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +pub mod mergedblobsfromremotes; pub mod mergedfromremotes; enum MergedCurVal { diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs new file mode 100644 index 0000000..1615846 --- /dev/null +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -0,0 +1,106 @@ +use crate::eventchunker::EventFull; +use crate::mergeblobs::MergedBlobsStream; +use crate::raw::client::x_processed_event_blobs_stream_from_node; +use err::Error; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use items::Sitemty; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::{Cluster, PerfOpts}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type T001 = Pin> + Send>>; +type T002 = Pin, Error>> + Send>>; + +pub struct MergedBlobsFromRemotes { + tcp_establish_futs: Vec>, + nodein: Vec>>, + merged: Option>, + completed: bool, + errored: bool, +} + +impl MergedBlobsFromRemotes { + pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { + info!("MergedBlobsFromRemotes evq {:?}", evq); + let mut tcp_establish_futs = vec![]; + for node in &cluster.nodes { + let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone()); + let f: T002 = Box::pin(f); + tcp_establish_futs.push(f); + } + let n = tcp_establish_futs.len(); + Self { + tcp_establish_futs, + nodein: (0..n).into_iter().map(|_| None).collect(), + merged: None, + completed: false, + errored: false, + } + } +} + +impl Stream for MergedBlobsFromRemotes { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if let Some(fut) = &mut self.merged { + match fut.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } else { + let mut pend = false; + let mut c1 = 0; + for i1 in 0..self.tcp_establish_futs.len() { + if self.nodein[i1].is_none() { + let f = &mut self.tcp_establish_futs[i1]; + pin_mut!(f); + match f.poll(cx) { + Ready(Ok(k)) => { + self.nodein[i1] = Some(k); + } + Ready(Err(e)) => { + self.errored = true; + return Ready(Some(Err(e))); + } + Pending => { + pend = true; + } + } + } else { + c1 += 1; + } + } + if pend { + Pending + } else { + if c1 == self.tcp_establish_futs.len() { + let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); + let s1 = MergedBlobsStream::new(inps); + self.merged = Some(Box::pin(s1)); + } + continue 'outer; + } + }; + } + } +} diff --git a/disk/src/raw/client.rs b/disk/src/raw/client.rs index 3487c28..dd69d1c 100644 --- a/disk/src/raw/client.rs +++ b/disk/src/raw/client.rs @@ -5,6 +5,7 @@ Delivers event data (not yet time-binned) from local storage and provides client to request such data from nodes. */ +use crate::eventchunker::EventFull; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::raw::eventsfromframes::EventsFromFrames; use err::Error; @@ -41,3 +42,27 @@ where let items = EventsFromFrames::new(frames); Ok(Box::pin(items)) } + +pub async fn x_processed_event_blobs_stream_from_node( + query: RawEventsQuery, + perf_opts: PerfOpts, + node: Node, +) -> Result> + Send>>, Error> { + netpod::log::info!( + "x_processed_event_blobs_stream_from_node to: {}:{}", + node.host, + node.port_raw + ); + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_string(&query)?; + let (netin, mut netout) = net.into_split(); + let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; + netout.write_all(&buf).await?; + let buf = make_term_frame(); + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let items = EventsFromFrames::new(frames); + Ok(Box::pin(items)) +} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 5202af0..0faae02 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -49,6 +49,7 @@ where macro_rules! pipe4 { ($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { match $agg_kind { + AggKind::EventBlobs => panic!(), AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( $evsv, @@ -191,3 +192,64 @@ pub async fn make_event_pipe( ); Ok(pipe) } + +pub async fn make_event_blobs_pipe( + evq: &RawEventsQuery, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + if false { + match dbconn::channel_exists(&evq.channel, &node_config).await { + Ok(_) => (), + Err(e) => return Err(e)?, + } + } + let range = &evq.range; + let channel_config = match read_local_config(&evq.channel, &node_config.node).await { + Ok(k) => k, + Err(e) => { + if e.msg().contains("ErrorKind::NotFound") { + let s = futures_util::stream::empty(); + return Ok(Box::pin(s)); + } else { + return Err(e)?; + } + } + }; + let entry_res = match extract_matching_config_entry(range, &channel_config) { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => entry, + }; + let shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let channel_config = netpod::ChannelConfig { + channel: evq.channel.clone(), + keyspace: entry.ks as u8, + time_bin_size: entry.bs, + shape: shape, + scalar_type: entry.scalar_type.clone(), + byte_order: entry.byte_order.clone(), + array: entry.is_array, + compression: entry.is_compressed, + }; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_blobs = EventChunkerMultifile::new( + range.clone(), + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + evq.disk_io_buffer_size, + event_chunker_conf, + true, + ); + let s = event_blobs.map(|item| Box::new(item) as Box); + let pipe: Pin> + Send>>; + pipe = Box::pin(s); + Ok(pipe) +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index d3e6fcb..672a276 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -18,10 +18,12 @@ futures-util = "0.3.14" tracing = "0.1.25" async-channel = "1.6" itertools = "0.10.1" +chrono = "0.4.19" err = { path = "../err" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } disk = { path = "../disk" } +items = { path = "../items" } parse = { path = "../parse" } netfetch = { path = "../netfetch" } archapp_wrap = { path = "../archapp_wrap" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index ae771a6..4992f3e 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,11 +1,15 @@ use crate::gather::{gather_get_json_generic, SubRes}; -use crate::response; +use crate::{response, BodyStream}; +use bytes::{BufMut, BytesMut}; use err::Error; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; +use items::{RangeCompletableItem, StreamItem}; use itertools::Itertools; -use netpod::{log::*, NodeConfigCached, APP_OCTET}; +use netpod::query::RawEventsQuery; +use netpod::{log::*, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -463,24 +467,204 @@ pub async fn proxy_distribute_v1(req: Request) -> Result, E Ok(res) } -pub async fn api1_binary_events(req: Request, _node_config: &NodeConfigCached) -> Result, Error> { +#[derive(Debug, Serialize, Deserialize)] +pub struct Api1Range { + #[serde(rename = "startDate")] + start_date: String, + #[serde(rename = "endDate")] + end_date: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Api1Query { + channels: Vec, + range: Api1Range, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Api1ChannelHeader { + name: String, + #[serde(rename = "type")] + ty: String, + #[serde(rename = "byteOrder")] + byte_order: String, + shape: Vec, + compression: Option, +} + +pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("api1_binary_events headers: {:?}", req.headers()); let accept_def = ""; let accept = req .headers() .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_OCTET { - // Ok(plain_events_binary(req, node_config).await.map_err(|e| { - // error!("{:?}", e); - // e - // })?) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)) + .to_owned(); + let (_head, body) = req.into_parts(); + let body_data = hyper::body::to_bytes(body).await?; + let qu: Api1Query = serde_json::from_slice(&body_data)?; + info!("got Api1Query: {:?}", qu); + let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date); + let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date); + let beg_date = beg_date?; + let end_date = end_date?; + info!("beg_date {:?} end_date {:?}", beg_date, end_date); + //let url = Url::parse(&format!("dummy:{}", req.uri()))?; + //let query = PlainEventsBinaryQuery::from_url(&url)?; + // TODO add stricter check for types, check with client. + if accept == APP_OCTET {} + if false { let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); error!("{:?}", e); - Err(e) - } else { - let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); - error!("{:?}", e); - Err(e) + return Err(e); + } + let beg_ns = beg_date.timestamp() as u64 * 1000000000 + beg_date.timestamp_subsec_nanos() as u64; + let end_ns = end_date.timestamp() as u64 * 1000000000 + end_date.timestamp_subsec_nanos() as u64; + let range = NanoRange { + beg: beg_ns, + end: end_ns, + }; + // TODO to server multiple channels, I need to wrap the loop over channels in a Stream itself. + let channel = qu.channels[0].clone(); + let channel = Channel { + backend: "sf-databuffer".into(), + name: channel, + }; + let channel_config = { + let channel_config = match read_local_config(&channel, &node_config.node).await { + Ok(k) => k, + Err(e) => { + error!("api1_binary_events error {:?}", e); + return Err(Error::with_msg_no_trace("can not parse channel config")); + } + }; + let entry_res = match extract_matching_config_entry(&range, &channel_config) { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => entry, + }; + entry.clone() + }; + warn!("found channel_config {:?}", channel_config); + let evq = RawEventsQuery { + channel: channel.clone(), + range, + agg_kind: netpod::AggKind::EventBlobs, + disk_io_buffer_size: 1024 * 4, + }; + let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; + let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( + evq, + perf_opts, + node_config.node_config.cluster.clone(), + ); + use futures_util::StreamExt; + let s = s.map({ + let mut header_out = false; + let mut count_events = 0; + move |b| { + let ret = match b { + Ok(b) => { + let f = match b { + StreamItem::DataItem(RangeCompletableItem::Data(b)) => { + let mut d = BytesMut::new(); + for i1 in 0..b.tss.len() { + if count_events < 6 { + info!( + "deco len {:?} BE {} scalar-type {:?} shape {:?}", + b.decomps[i1].as_ref().map(|x| x.len()), + b.be[i1], + b.scalar_types[i1], + b.shapes[i1] + ); + } + if !header_out { + let head = Api1ChannelHeader { + name: channel.name.clone(), + ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(), + byte_order: if b.be[i1] { + "BIG_ENDIAN".into() + } else { + "LITTLE_ENDIAN".into() + }, + // The shape is inconsistent on the events. + // Seems like the config is to be trusted in this case. + shape: shape_to_api3proto(&channel_config.shape), + //shape: vec![2560], + compression: None, + }; + let h = serde_json::to_string(&head)?; + info!("sending channel header {}", h); + let l1 = 1 + h.as_bytes().len() as u32; + d.put_u32(l1); + d.put_u8(0); + d.extend_from_slice(h.as_bytes()); + d.put_u32(l1); + header_out = true; + } + { + if let Some(deco) = &b.decomps[i1] { + let l1 = 17 + deco.len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&deco); + d.put_u32(l1); + } + } + count_events += 1; + } + d + } + _ => { + // + BytesMut::new() + } + }; + Ok(f) + } + Err(e) => Err(e), + }; + ret + } + }); + let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy"); + let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?; + Ok(ret) +} + +fn scalar_type_to_api3proto(sty: &ScalarType) -> &'static str { + match sty { + ScalarType::U8 => "uint8", + ScalarType::U16 => "uint16", + ScalarType::U32 => "uint32", + ScalarType::U64 => "uint64", + ScalarType::I8 => "int8", + ScalarType::I16 => "int16", + ScalarType::I32 => "int32", + ScalarType::I64 => "int64", + ScalarType::F32 => "float32", + ScalarType::F64 => "float64", + ScalarType::BOOL => "bool", + } +} + +fn shape_to_api3proto(sh: &Option>) -> Vec { + match sh { + None => vec![], + Some(g) => { + if g.len() == 1 { + vec![g[0]] + } else if g.len() == 2 { + vec![g[0], g[1]] + } else { + err::todoval() + } + } } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 575627d..bcffcd5 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -246,6 +246,9 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } + } else if path.starts_with("/api/1/requestStatus/") { + info!("{}", path); + Ok(response(StatusCode::OK).body(Body::from("{}"))?) } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path) diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 14eb366..faf5be7 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -25,7 +25,7 @@ impl SitemtyFrameType for EventValues where NTY: NumOps, { - const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::EVENT_VALUES_FRAME_TYPE_ID + NTY::SUB; } impl EventValues { diff --git a/items/src/lib.rs b/items/src/lib.rs index bbf384e..81f6e4c 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -27,6 +27,16 @@ pub mod waveevents; pub mod xbinnedscalarevents; pub mod xbinnedwaveevents; +pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; +pub const EVENT_VALUES_FRAME_TYPE_ID: u32 = 0x500; +pub const MIN_MAX_AVG_BINS: u32 = 0x700; +pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x800; +pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; +pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x900; +pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; +pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0xb00; +pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; + pub fn bool_is_false(j: &bool) -> bool { *j == false } @@ -193,7 +203,7 @@ pub trait FrameType { } impl FrameType for EventQueryJsonStringFrame { - const FRAME_TYPE_ID: u32 = 0x100; + const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME; } impl FrameType for Sitemty diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 10b47f4..96c3363 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -30,7 +30,7 @@ impl SitemtyFrameType for MinMaxAvgBins where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_BINS + NTY::SUB; } impl fmt::Debug for MinMaxAvgBins diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index c07baf6..bfd2459 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -31,7 +31,7 @@ impl SitemtyFrameType for MinMaxAvgDim1Bins where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID + NTY::SUB; } impl fmt::Debug for MinMaxAvgDim1Bins diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index d7069f4..b76adeb 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -29,7 +29,7 @@ impl SitemtyFrameType for MinMaxAvgWaveBins where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_WAVE_BINS + NTY::SUB; } impl fmt::Debug for MinMaxAvgWaveBins diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 7aa9495..f6c10f3 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -24,7 +24,7 @@ impl SitemtyFrameType for WaveEvents where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::WAVE_EVENTS_FRAME_TYPE_ID + NTY::SUB; } impl WaveEvents { diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 5ca5836..204b24a 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -24,7 +24,7 @@ impl SitemtyFrameType for XBinnedScalarEvents where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID + NTY::SUB; } impl XBinnedScalarEvents { diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 7fb5de1..46a14f9 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -25,7 +25,7 @@ impl SitemtyFrameType for XBinnedWaveEvents where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB; + const FRAME_TYPE_ID: u32 = crate::X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID + NTY::SUB; } impl XBinnedWaveEvents { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 0338b9e..3aa432e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -720,6 +720,7 @@ impl BinnedRange { #[derive(Clone, Serialize, Deserialize)] pub enum AggKind { + EventBlobs, DimXBins1, DimXBinsN(u32), Plain, @@ -729,6 +730,7 @@ pub enum AggKind { impl AggKind { pub fn do_time_weighted(&self) -> bool { match self { + Self::EventBlobs => false, Self::TimeWeightedScalar => true, Self::DimXBins1 => false, Self::DimXBinsN(_) => false, @@ -739,6 +741,7 @@ impl AggKind { pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { match agg_kind { + AggKind::EventBlobs => 0, AggKind::TimeWeightedScalar => 0, AggKind::DimXBins1 => 0, AggKind::DimXBinsN(n) => { @@ -761,6 +764,9 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { impl fmt::Display for AggKind { fn fmt(&self, fmt: &mut fmt::Formatter) -> std::fmt::Result { match self { + Self::EventBlobs => { + write!(fmt, "EventBlobs") + } Self::DimXBins1 => { write!(fmt, "DimXBins1") } @@ -788,7 +794,9 @@ impl FromStr for AggKind { fn from_str(s: &str) -> Result { let nmark = "DimXBinsN"; - if s == "DimXBins1" { + if s == "EventBlobs" { + Ok(AggKind::EventBlobs) + } else if s == "DimXBins1" { Ok(AggKind::DimXBins1) } else if s == "TimeWeightedScalar" { Ok(AggKind::TimeWeightedScalar) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index f6e8c5c..86b4f90 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -5,8 +5,8 @@ use futures_core::Stream; use futures_util::StreamExt; use items::frame::{decode_frame, make_term_frame}; use items::{Framable, StreamItem}; -use netpod::log::*; use netpod::query::RawEventsQuery; +use netpod::{log::*, AggKind}; use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts}; use std::net::SocketAddr; use std::pin::Pin; @@ -114,7 +114,7 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("json parse error"), netout))?; } }; - info!("---------------------------------------------------\nevq {:?}", evq); + info!("--- nodenet::conn got query -------------------\nevq {:?}", evq); let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.archiver_appliance { @@ -123,9 +123,15 @@ async fn events_conn_handler_inner_try( Err(e) => return Err((e, netout))?, } } else { - match disk::raw::conn::make_event_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, + match evq.agg_kind { + AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, } }; diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 3718109..e950fa2 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -24,7 +24,7 @@ where Err(nom::Err::Error(e)) } -#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] +#[derive(Clone, Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] pub enum CompressionMethod { BitshuffleLZ4 = 0, } @@ -35,7 +35,7 @@ impl CompressionMethod { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ConfigEntry { pub ts: u64, pub pulse: i64, @@ -290,6 +290,7 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { None, Multiple,