From a8f15da101b7c48b59ff669de801ba322829ad49 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 5 Jul 2021 23:29:42 +0200 Subject: [PATCH] WIP on get channel info for arch app --- .cargo/config.toml | 1 + Cargo.toml | 7 +- archapp/Cargo.toml | 20 +- archapp/src/events.rs | 34 +++ archapp/src/lib.rs | 18 +- archapp/src/parse.rs | 85 ++++++-- archapp/src/parsestub.rs | 2 +- archapp_wrap/Cargo.toml | 18 ++ archapp_wrap/src/lib.rs | 28 +++ archapp_xc/Cargo.toml | 12 ++ archapp_xc/src/lib.rs | 18 ++ daqbuffer/Cargo.toml | 2 +- daqbufp2/Cargo.toml | 3 +- daqbufp2/src/client.rs | 4 +- daqbufp2/src/test/binnedbinary.rs | 6 +- daqbufp2/src/test/events.rs | 6 +- dbconn/Cargo.toml | 2 +- disk/Cargo.toml | 3 +- disk/src/agg/binnedt.rs | 6 +- disk/src/agg/enp.rs | 22 ++ disk/src/agg/eventbatch.rs | 9 +- disk/src/agg/scalarbinbatch.rs | 9 +- disk/src/agg/streams.rs | 16 +- disk/src/binned.rs | 27 ++- disk/src/binned/binnedfrompbv.rs | 6 +- disk/src/binned/dim1.rs | 9 +- disk/src/binned/pbv.rs | 9 +- disk/src/binned/prebinned.rs | 3 +- disk/src/channelexec.rs | 9 +- disk/src/decode.rs | 12 +- disk/src/eventblobs.rs | 4 +- disk/src/eventchunker.rs | 3 +- disk/src/frame/inmem.rs | 4 +- disk/src/frame/makeframe.rs | 249 +--------------------- disk/src/lib.rs | 4 - disk/src/merge.rs | 7 +- disk/src/merge/mergedfromremotes.rs | 6 +- disk/src/raw.rs | 60 +----- disk/src/raw/client.rs | 43 ++++ disk/src/raw/conn.rs | 181 ++-------------- disk/src/raw/eventsfromframes.rs | 5 +- disk/src/streamlog.rs | 84 +------- err/Cargo.toml | 2 +- httpret/Cargo.toml | 5 +- httpret/src/lib.rs | 27 ++- items/Cargo.toml | 12 ++ items/src/lib.rs | 315 ++++++++++++++++++++++++++++ netfetch/Cargo.toml | 2 +- netpod/Cargo.toml | 1 + netpod/src/lib.rs | 27 ++- netpod/src/query.rs | 13 ++ nodenet/Cargo.toml | 29 +++ nodenet/src/conn.rs | 154 ++++++++++++++ nodenet/src/lib.rs | 1 + parse/Cargo.toml | 2 +- taskrun/Cargo.toml | 2 +- 56 files changed, 956 insertions(+), 692 deletions(-) create mode 100644 archapp/src/events.rs create mode 100644 archapp_wrap/Cargo.toml create mode 100644 archapp_wrap/src/lib.rs create mode 100644 archapp_xc/Cargo.toml create mode 100644 archapp_xc/src/lib.rs create mode 100644 disk/src/raw/client.rs create mode 100644 items/Cargo.toml create mode 100644 items/src/lib.rs create mode 100644 netpod/src/query.rs create mode 100644 nodenet/Cargo.toml create mode 100644 nodenet/src/conn.rs create mode 100644 nodenet/src/lib.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 6994f47..f0e7617 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,6 +4,7 @@ rustflags = [ "-C", "force-unwind-tables=yes", "-C", "embed-bitcode=no", "-C", "relocation-model=pic", + #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", ] diff --git a/Cargo.toml b/Cargo.toml index 4b0d914..d0508a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,10 @@ [workspace] -members = ["daqbuffer", "h5out", "archapp"] +members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet"] [profile.release] debug = 1 -opt-level = 2 +opt-level = 1 #overflow-checks = true #debug-assertions = true -lto = false +lto = "off" +codegen-units = 40 diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index e4b001b..e28b6b9 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -5,22 +5,24 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -tokio = { version = "1.4.0", features = ["io-util", "net", "time", "sync", "fs"] } -tracing = "0.1.25" -futures-core = "0.3.14" -futures-util = "0.3.14" +tokio = { version = "1.7.1", features = ["io-util", "net", "time", "sync", "fs"] } +tracing = "0.1.26" +futures-core = "0.3.15" +futures-util = "0.3.15" bytes = "1.0.1" -serde = "1.0" -serde_derive = "1.0" -serde_json = "1.0" -chrono = "0.4" +serde = "1.0.126" +serde_derive = "1.0.126" +serde_json = "1.0.64" +chrono = "0.4.19" protobuf = "2.24.1" async-channel = "1.6" +archapp_xc = { path = "../archapp_xc" } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } +items = { path = "../items" } [features] -default = [] +default = ["devread"] devread = [] diff --git a/archapp/src/events.rs b/archapp/src/events.rs new file mode 100644 index 0000000..a637950 --- /dev/null +++ b/archapp/src/events.rs @@ -0,0 +1,34 @@ +use err::Error; +use futures_core::Stream; +use items::Framable; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached, Shape}; +use std::pin::Pin; + +pub async fn make_event_pipe( + _evq: &RawEventsQuery, + _aa: &ArchiverAppliance, +) -> Result> + Send>>, Error> { + err::todoval() +} + +pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result { + // SARUN11/CVME/DBLM546/IOC_CPU_LOAD + // SARUN11-CVME-DBLM546:IOC_CPU_LOAD + let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect(); + let path1 = node_config + .node + .archiver_appliance + .as_ref() + .unwrap() + .data_base_path + .clone(); + let path2 = a.iter().fold(path1, |a, &x| a.join(x)); + info!("path2: {}", path2.to_str().unwrap()); + let ret = ChannelInfo { + shape: Shape::Scalar, + msg: format!("{:?} path2: {:?}", a, path2), + }; + Ok(ret) +} diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index b35ca2a..b3ee25c 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -1,5 +1,4 @@ use err::Error; -use serde::Serialize; #[cfg(feature = "devread")] pub mod generated; @@ -11,25 +10,12 @@ pub mod parse; pub mod parsestub; #[cfg(not(feature = "devread"))] pub use parsestub as parse; +pub mod events; #[cfg(feature = "devread")] #[cfg(test)] pub mod test; -pub trait ItemSer { - fn serialize(&self) -> Result, Error>; -} - -impl ItemSer for T -where - T: Serialize, -{ - fn serialize(&self) -> Result, Error> { - let u = serde_json::to_vec(self)?; - Ok(u) - } -} - -pub fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { +fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { let mut ret = Vec::with_capacity(inp.len() * 5 / 4); let mut esc = false; for &k in inp.iter() { diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 6972b28..82e5d6e 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,4 +1,5 @@ -use crate::{unescape_archapp_msg, ItemSer}; +use crate::unescape_archapp_msg; +use archapp_xc::*; use async_channel::{bounded, Receiver}; use err::Error; use netpod::log::*; @@ -16,6 +17,9 @@ pub struct EpicsEventPayloadInfo { headers: Vec<(String, String)>, year: i32, pvname: String, + datatype: String, + ts0: u32, + val0: f32, } async fn read_pb_file(path: PathBuf) -> Result { @@ -35,6 +39,15 @@ async fn read_pb_file(path: PathBuf) -> Result { } } let mut j1 = 0; + let mut payload_info = crate::generated::EPICSEvent::PayloadInfo::new(); + let mut z = EpicsEventPayloadInfo { + pvname: String::new(), + headers: vec![], + year: 0, + datatype: String::new(), + ts0: 0, + val0: 0.0, + }; loop { let mut i2 = usize::MAX; for (i1, &k) in buf[j1..].iter().enumerate() { @@ -47,10 +60,10 @@ async fn read_pb_file(path: PathBuf) -> Result { //info!("got NL {} .. {}", j1, i2); let m = unescape_archapp_msg(&buf[j1..i2])?; if j1 == 0 { - let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m) + payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m) .map_err(|_| Error::with_msg("can not parse PayloadInfo"))?; //info!("got payload_info: {:?}", payload_info); - let z = EpicsEventPayloadInfo { + z = EpicsEventPayloadInfo { headers: payload_info .get_headers() .iter() @@ -58,12 +71,57 @@ async fn read_pb_file(path: PathBuf) -> Result { .collect(), year: payload_info.get_year(), pvname: payload_info.get_pvname().into(), + datatype: String::new(), + ts0: 0, + val0: 0.0, }; - return Ok(z); } else { - let _scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m) - .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarDouble"))?; - //info!("got scalar_double: {:?}", scalar_double); + let ft = payload_info.get_field_type(); + { + use crate::generated::EPICSEvent::PayloadType::*; + let (ts, val) = match ft { + SCALAR_BYTE => { + let d = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarByte"))?; + (d.get_secondsintoyear(), d.get_val()[0] as f32) + } + SCALAR_SHORT => { + let d = crate::generated::EPICSEvent::ScalarShort::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarShort"))?; + (d.get_secondsintoyear(), d.get_val() as f32) + } + SCALAR_INT => { + let d = crate::generated::EPICSEvent::ScalarInt::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarInt"))?; + (d.get_secondsintoyear(), d.get_val() as f32) + } + SCALAR_FLOAT => { + let d = crate::generated::EPICSEvent::ScalarFloat::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarFloat"))?; + (d.get_secondsintoyear(), d.get_val() as f32) + } + SCALAR_DOUBLE => { + let d = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::ScalarDouble"))?; + (d.get_secondsintoyear(), d.get_val() as f32) + } + WAVEFORM_FLOAT => { + let d = crate::generated::EPICSEvent::VectorFloat::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::VectorFloat"))?; + (d.get_secondsintoyear(), d.get_val()[0] as f32) + } + WAVEFORM_DOUBLE => { + let d = crate::generated::EPICSEvent::VectorDouble::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse EPICSEvent::VectorDouble"))?; + (d.get_secondsintoyear(), d.get_val()[0] as f32) + } + _ => (0, 0.0), + }; + z.datatype = format!("{:?}", ft); + z.ts0 = ts; + z.val0 = val; + return Ok(z); + } } } else { //info!("no more packets"); @@ -71,12 +129,10 @@ async fn read_pb_file(path: PathBuf) -> Result { } j1 = i2 + 1; } - Err(Error::with_msg(format!("no header entry found in file"))) + Err(Error::with_msg(format!("no data found in file"))) } -type RT1 = Box; - -pub async fn scan_files( +pub async fn scan_files_inner( pairs: BTreeMap, node_config: NodeConfigCached, ) -> Result>, Error> { @@ -120,7 +176,7 @@ pub async fn scan_files( } } } else if meta.is_file() { - tx.send(Ok(Box::new(path.clone()) as RT1)).await?; + //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; if path .to_str() .ok_or_else(|| Error::with_msg("invalid path string"))? @@ -143,7 +199,10 @@ pub async fn scan_files( )) .await?; } else { - dbconn::insert_channel(packet.pvname.clone(), ndi.facility, &dbc).await?; + if false { + dbconn::insert_channel(packet.pvname.clone(), ndi.facility, &dbc).await?; + } + tx.send(Ok(Box::new(serde_json::to_value(&packet)?) as RT1)).await?; } } } diff --git a/archapp/src/parsestub.rs b/archapp/src/parsestub.rs index f07ce71..864454c 100644 --- a/archapp/src/parsestub.rs +++ b/archapp/src/parsestub.rs @@ -1,4 +1,4 @@ -use crate::ItemSer; +use archapp_xc::ItemSer; use async_channel::Receiver; use err::Error; use netpod::NodeConfigCached; diff --git a/archapp_wrap/Cargo.toml b/archapp_wrap/Cargo.toml new file mode 100644 index 0000000..5da3e61 --- /dev/null +++ b/archapp_wrap/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "archapp_wrap" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = "1.0.126" +serde_json = "1.0.64" +async-channel = "1.6" +futures-core = "0.3.15" +futures-util = "0.3.15" +err = { path = "../err" } +netpod = { path = "../netpod" } +archapp = { path = "../archapp" } +archapp_xc = { path = "../archapp_xc" } +disk = { path = "../disk" } +items = { path = "../items" } diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs new file mode 100644 index 0000000..411dfad --- /dev/null +++ b/archapp_wrap/src/lib.rs @@ -0,0 +1,28 @@ +use archapp_xc::RT1; +use async_channel::Receiver; +use err::Error; +use futures_core::Stream; +use items::Framable; +use netpod::query::RawEventsQuery; +use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached}; +use std::collections::BTreeMap; +use std::future::Future; +use std::pin::Pin; + +pub fn scan_files( + pairs: BTreeMap, + node_config: NodeConfigCached, +) -> Pin>, Error>> + Send>> { + Box::pin(archapp::parse::scan_files_inner(pairs, node_config)) +} + +pub async fn make_event_pipe( + evq: &RawEventsQuery, + aa: &ArchiverAppliance, +) -> Result> + Send>>, Error> { + archapp::events::make_event_pipe(evq, aa).await +} + +pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result { + archapp::events::channel_info(channel, node_config).await +} diff --git a/archapp_xc/Cargo.toml b/archapp_xc/Cargo.toml new file mode 100644 index 0000000..df6ef5b --- /dev/null +++ b/archapp_xc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "archapp_xc" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = "1.0.126" +serde_json = "1.0.64" +async-channel = "1.6" +err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/archapp_xc/src/lib.rs b/archapp_xc/src/lib.rs new file mode 100644 index 0000000..173bb57 --- /dev/null +++ b/archapp_xc/src/lib.rs @@ -0,0 +1,18 @@ +use err::Error; +use serde::Serialize; + +pub type RT1 = Box; + +pub trait ItemSer { + fn serialize(&self) -> Result, Error>; +} + +impl ItemSer for T +where + T: Serialize, +{ + fn serialize(&self) -> Result, Error> { + let u = serde_json::to_vec(self)?; + Ok(u) + } +} diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index 6877efb..d641f98 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index a287c54..5266784 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" @@ -27,3 +27,4 @@ taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } httpret = { path = "../httpret" } disk = { path = "../disk" } +items = { path = "../items" } diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index c55adad..8f34739 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -1,15 +1,13 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use disk::agg::streams::StreamItem; use disk::binned::query::{BinnedQuery, CacheUsage}; -use disk::binned::RangeCompletableItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; -use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; use err::Error; use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; +use items::{FrameType, RangeCompletableItem, StreamItem}; use netpod::log::*; use netpod::{AggKind, AppendToUrl, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; use url::Url; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 0880417..176b180 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -1,16 +1,14 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::agg::streams::{StatsItem, StreamItem}; use disk::binned::query::{BinnedQuery, CacheUsage}; -use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; +use disk::binned::{MinMaxAvgBins, WithLen}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; -use disk::frame::makeframe::{FrameType, SubFrId}; use disk::streamlog::Streamlog; -use disk::Sitemty; use err::Error; use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; use hyper::Body; +use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId}; use netpod::log::*; use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; use serde::de::DeserializeOwned; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index ddb700b..4550878 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -1,17 +1,15 @@ use crate::nodes::require_test_hosts_running; use chrono::{DateTime, Utc}; -use disk::agg::streams::{StatsItem, StreamItem}; -use disk::binned::{NumOps, RangeCompletableItem, WithLen}; +use disk::binned::{NumOps, WithLen}; use disk::decode::EventValues; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; -use disk::frame::makeframe::FrameType; use disk::streamlog::Streamlog; -use disk::Sitemty; use err::Error; use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; use hyper::Body; +use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem}; use netpod::log::*; use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index e48e23c..7ee2a95 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" crc32fast = "1.2.1" arrayref = "0.3.6" diff --git a/disk/Cargo.toml b/disk/Cargo.toml index a361243..ae031f4 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -10,7 +10,7 @@ serde_json = "1.0" serde_cbor = "0.11.1" http = "0.2" chrono = { version = "0.4.19", features = ["serde"] } -tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tokio-stream = {version = "0.1.5", features = ["fs"]} hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" @@ -38,3 +38,4 @@ netpod = { path = "../netpod" } bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } +items = { path = "../items" } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index aa94fa3..bfdeb12 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,8 +1,8 @@ -use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::{FilterFittingInside, RangeCompletableItem, RangeOverlapInfo, ReadableFromFile}; -use crate::Sitemty; +use crate::agg::streams::Appendable; +use crate::binned::{FilterFittingInside, RangeOverlapInfo, ReadableFromFile}; use futures_core::Stream; use futures_util::StreamExt; +use items::{RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::{BinnedRange, NanoRange}; use serde::Serialize; diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 1b24a6f..ac4be0f 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -8,6 +8,7 @@ use crate::binned::{ }; use crate::decode::EventValues; use err::Error; +use items::{SitemtyFrameType, SubFrId}; use netpod::log::*; use netpod::timeunits::{MS, SEC}; use netpod::{x_bin_count, AggKind, NanoRange, Shape}; @@ -44,6 +45,13 @@ pub struct XBinnedScalarEvents { avgs: Vec, } +impl SitemtyFrameType for XBinnedScalarEvents +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB; +} + impl XBinnedScalarEvents { pub fn empty() -> Self { Self { @@ -381,6 +389,13 @@ pub struct XBinnedWaveEvents { avgs: Vec>, } +impl SitemtyFrameType for XBinnedWaveEvents +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB; +} + impl XBinnedWaveEvents { pub fn empty() -> Self { Self { @@ -719,6 +734,13 @@ pub struct WaveEvents { pub vals: Vec>, } +impl SitemtyFrameType for WaveEvents +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; +} + impl WaveEvents { pub fn empty() -> Self { Self { diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 3ab4b0f..aed52f7 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,8 +1,9 @@ -use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::{MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; +use crate::agg::streams::Appendable; +use crate::binned::{MakeBytesFrame, RangeOverlapInfo}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; +use items::{RangeCompletableItem, SitemtyFrameType, StreamItem}; use netpod::log::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; @@ -16,6 +17,10 @@ pub struct MinMaxAvgScalarEventBatch { pub avgs: Vec, } +impl SitemtyFrameType for MinMaxAvgScalarEventBatch { + const FRAME_TYPE_ID: u32 = 0x300; +} + impl MinMaxAvgScalarEventBatch { pub fn empty() -> Self { Self { diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 27803ee..1a15eb8 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,9 +1,10 @@ -use crate::agg::streams::{Appendable, StreamItem, ToJsonBytes}; +use crate::agg::streams::{Appendable, ToJsonBytes}; use crate::agg::{Fits, FitsInside}; -use crate::binned::{MakeBytesFrame, RangeCompletableItem}; +use crate::binned::MakeBytesFrame; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; +use items::{RangeCompletableItem, SitemtyFrameType, StreamItem}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; @@ -21,6 +22,10 @@ pub struct MinMaxAvgScalarBinBatch { pub avgs: Vec, } +impl SitemtyFrameType for MinMaxAvgScalarBinBatch { + const FRAME_TYPE_ID: u32 = 0x200; +} + impl MinMaxAvgScalarBinBatch { pub fn empty() -> Self { Self { diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 025258e..a512cd0 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -1,20 +1,6 @@ use crate::binned::WithLen; -use crate::streamlog::LogItem; use err::Error; -use netpod::EventDataReadStats; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub enum StatsItem { - EventDataReadStats(EventDataReadStats), -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum StreamItem { - DataItem(T), - Log(LogItem), - Stats(StatsItem), -} +use serde::Serialize; pub trait Collector: Send + Unpin + WithLen { type Input: Collectable; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 7e98bd8..3ec9582 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -2,23 +2,22 @@ use crate::agg::binnedt::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggre use crate::agg::enp::ts_offs_from_abs; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJsonBytes, ToJsonResult}; +use crate::agg::streams::{Appendable, Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; use crate::channelexec::{channel_exec, collect_plain_events_json, ChannelExecFunction}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, EventValues, NumFromBytes}; -use crate::frame::makeframe::{Framable, FrameType, SubFrId}; use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::raw::RawEventsQuery; -use crate::Sitemty; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{Framable, FrameType, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, SubFrId}; use netpod::log::*; +use netpod::query::RawEventsQuery; use netpod::timeunits::SEC; use netpod::{ x_bin_count, AggKind, BinnedRange, BoolNum, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, @@ -686,6 +685,13 @@ pub struct MinMaxAvgBins { pub avgs: Vec>, } +impl SitemtyFrameType for MinMaxAvgBins +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB; +} + impl fmt::Debug for MinMaxAvgBins where NTY: fmt::Debug, @@ -1162,12 +1168,6 @@ where } } -#[derive(Debug, Serialize, Deserialize)] -pub enum RangeCompletableItem { - RangeComplete, - Data(T), -} - #[derive(Serialize, Deserialize)] pub struct MinMaxAvgWaveBins { pub ts1s: Vec, @@ -1178,6 +1178,13 @@ pub struct MinMaxAvgWaveBins { pub avgs: Vec>>, } +impl SitemtyFrameType for MinMaxAvgWaveBins +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB; +} + impl fmt::Debug for MinMaxAvgWaveBins where NTY: fmt::Debug, diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 5396441..b121d20 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,15 +1,13 @@ use crate::agg::binnedt::{TBinnerStream, TimeBinnableType}; -use crate::agg::streams::StreamItem; use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::RangeCompletableItem; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, FrameType}; -use crate::Sitemty; +use crate::frame::makeframe::decode_frame; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{StatusCode, Uri}; +use items::{FrameType, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::{ x_bin_count, AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs index 6fef183..0df1e08 100644 --- a/disk/src/binned/dim1.rs +++ b/disk/src/binned/dim1.rs @@ -5,9 +5,9 @@ use crate::agg::{Fits, FitsInside}; use crate::binned::{ Bool, FilterFittingInside, IsoDateTime, NumOps, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBins, WithLen, }; -use crate::Sitemty; use chrono::{TimeZone, Utc}; use err::Error; +use items::{Sitemty, SitemtyFrameType, SubFrId}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; @@ -27,6 +27,13 @@ pub struct MinMaxAvgDim1Bins { pub avgs: Vec>>, } +impl SitemtyFrameType for MinMaxAvgDim1Bins +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB; +} + impl fmt::Debug for MinMaxAvgDim1Bins where NTY: fmt::Debug, diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index e2faf8d..c96cbf0 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,19 +1,18 @@ use crate::agg::binnedt::{TBinnerStream, TimeBinnableType}; -use crate::agg::streams::{Appendable, StreamItem}; +use crate::agg::streams::Appendable; use crate::binned::binnedfrompbv::FetchedPreBinned; use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, WithLen}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, ReadableFromFile, WithLen}; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; -use crate::frame::makeframe::FrameType; use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::raw::RawEventsQuery; use crate::streamlog::Streamlog; -use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; +use items::{FrameType, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; +use netpod::query::RawEventsQuery; use netpod::{ x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape, }; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 43a7f30..ca08370 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -8,12 +8,11 @@ use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::frame::makeframe::{Framable, FrameType}; -use crate::Sitemty; use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{Framable, FrameType, Sitemty}; use netpod::{AggKind, BoolNum, ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index d4c99ca..dffc875 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -1,21 +1,20 @@ use crate::agg::binnedt::TimeBinnableType; use crate::agg::enp::Identity; -use crate::agg::streams::{Collectable, Collector, StreamItem}; -use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem}; +use crate::agg::streams::{Collectable, Collector}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, LittleEndian, NumFromBytes, }; -use crate::frame::makeframe::{Framable, FrameType}; use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::raw::RawEventsQuery; -use crate::Sitemty; use bytes::Bytes; use err::Error; use futures_core::Stream; use futures_util::future::FutureExt; use futures_util::StreamExt; +use items::{Framable, FrameType, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; +use netpod::query::RawEventsQuery; use netpod::{AggKind, BoolNum, ByteOrder, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; diff --git a/disk/src/decode.rs b/disk/src/decode.rs index a05d6ca..86921cf 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,16 +1,17 @@ use crate::agg::binnedt::TimeBinnableType; use crate::agg::enp::{ts_offs_from_abs, Identity, WaveNBinner, WavePlainProc, WaveXBinner}; -use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem}; +use crate::agg::streams::{Appendable, Collectable, Collector}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ Bool, EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, - RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, + RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{RangeCompletableItem, SitemtyFrameType, StreamItem}; use netpod::{BoolNum, NanoRange}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; @@ -190,6 +191,13 @@ pub struct EventValues { pub values: Vec, } +impl SitemtyFrameType for EventValues +where + NTY: NumOps, +{ + const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; +} + impl EventValues { pub fn empty() -> Self { Self { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 7440218..1a9038c 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,12 +1,10 @@ -use crate::agg::streams::StreamItem; -use crate::binned::RangeCompletableItem; use crate::dataopen::{open_files, OpenedFile}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::file_content_stream; -use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{LogItem, RangeCompletableItem, StreamItem}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ChannelConfig, NanoRange, Node}; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 30c0027..41bf2d9 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,11 +1,10 @@ -use crate::agg::streams::{StatsItem, StreamItem}; -use crate::binned::RangeCompletableItem; use crate::{FileChunkRead, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{RangeCompletableItem, StatsItem, StreamItem}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 211c7c3..85ad46c 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -1,9 +1,9 @@ -use crate::agg::streams::StreamItem; -use crate::frame::makeframe::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::pin_mut; +use items::StreamItem; +use items::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use netpod::log::*; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 939f4a4..1737253 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,256 +1,9 @@ -use crate::agg::enp::{WaveEvents, XBinnedScalarEvents, XBinnedWaveEvents}; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; -use crate::binned::dim1::MinMaxAvgDim1Bins; -use crate::binned::{MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, RangeCompletableItem}; -use crate::decode::EventValues; use crate::frame::inmem::InMemoryFrame; -use crate::raw::EventQueryJsonStringFrame; -use crate::Sitemty; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::BoolNum; +use items::{FrameType, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use serde::{de::DeserializeOwned, Serialize}; -pub const INMEM_FRAME_ENCID: u32 = 0x12121212; -pub const INMEM_FRAME_HEAD: usize = 20; -pub const INMEM_FRAME_FOOT: usize = 4; -pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; - -pub trait SubFrId { - const SUB: u32; -} - -impl SubFrId for u8 { - const SUB: u32 = 3; -} - -impl SubFrId for u16 { - const SUB: u32 = 5; -} - -impl SubFrId for u32 { - const SUB: u32 = 8; -} - -impl SubFrId for u64 { - const SUB: u32 = 10; -} - -impl SubFrId for i8 { - const SUB: u32 = 2; -} - -impl SubFrId for i16 { - const SUB: u32 = 4; -} - -impl SubFrId for i32 { - const SUB: u32 = 7; -} - -impl SubFrId for i64 { - const SUB: u32 = 9; -} - -impl SubFrId for f32 { - const SUB: u32 = 11; -} - -impl SubFrId for f64 { - const SUB: u32 = 12; -} - -impl SubFrId for BoolNum { - const SUB: u32 = 13; -} - -pub trait FrameType { - const FRAME_TYPE_ID: u32; -} - -impl FrameType for EventQueryJsonStringFrame { - const FRAME_TYPE_ID: u32 = 0x100; -} - -impl FrameType for Sitemty { - const FRAME_TYPE_ID: u32 = 0x200; -} - -impl FrameType for Sitemty { - const FRAME_TYPE_ID: u32 = 0x300; -} - -impl FrameType for Sitemty> -where - NTY: NumOps, -{ - const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB; -} - -impl FrameType for Sitemty> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB; -} - -pub trait ProvidesFrameType { - fn frame_type_id(&self) -> u32; -} - -pub trait Framable: Send { - fn typeid(&self) -> u32; - fn make_frame(&self) -> Result; -} - -impl Framable for Sitemty { - fn typeid(&self) -> u32 { - EventQueryJsonStringFrame::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - panic!() - } -} - -impl Framable for Result>, Error> { - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>, Error> { - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>>, err::Error> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Result>>, err::Error> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - -impl Framable for Sitemty> -where - NTY: NumOps + Serialize, -{ - fn typeid(&self) -> u32 { - Self::FRAME_TYPE_ID - } - fn make_frame(&self) -> Result { - make_frame(self) - } -} - pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 2deb45b..4b9b762 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,5 +1,3 @@ -use crate::agg::streams::StreamItem; -use crate::binned::RangeCompletableItem; use crate::dataopen::open_files; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use bytes::{Bytes, BytesMut}; @@ -477,8 +475,6 @@ pub fn raw_concat_channel_read_stream_timebin( } } -pub type Sitemty = Result>, Error>; - pub mod dtflags { pub const COMPRESSION: u8 = 0x80; pub const ARRAY: u8 = 0x40; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index f34a016..0bd69bb 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,10 +1,9 @@ -use crate::agg::streams::{Appendable, StatsItem, StreamItem}; -use crate::binned::{EventsNodeProcessor, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; -use crate::streamlog::LogItem; -use crate::Sitemty; +use crate::agg::streams::Appendable; +use crate::binned::{EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::{LogItem, RangeCompletableItem, Sitemty, StatsItem, StreamItem}; use netpod::log::*; use netpod::EventDataReadStats; use std::collections::VecDeque; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index 4019910..992b60e 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -1,13 +1,13 @@ use crate::agg::streams::Appendable; use crate::binned::{EventsNodeProcessor, PushableIndex}; -use crate::frame::makeframe::FrameType; use crate::merge::MergedStream; -use crate::raw::{x_processed_stream_from_node, RawEventsQuery}; -use crate::Sitemty; +use crate::raw::client::x_processed_stream_from_node; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; +use items::{FrameType, Sitemty}; use netpod::log::*; +use netpod::query::RawEventsQuery; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 2e7926c..62ea3d7 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -1,61 +1,3 @@ -/*! -Delivers event data. - -Delivers event data (not yet time-binned) from local storage and provides client functions -to request such data from nodes. -*/ - -use crate::agg::streams::StreamItem; -use crate::binned::{EventsNodeProcessor, RangeCompletableItem}; -use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{make_frame, make_term_frame, FrameType}; -use crate::raw::eventsfromframes::EventsFromFrames; -use crate::Sitemty; -use err::Error; -use futures_core::Stream; -use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts}; -use serde::{Deserialize, Serialize}; -use std::pin::Pin; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; - +pub mod client; pub mod conn; pub mod eventsfromframes; - -/** -Query parameters to request (optionally) X-processed, but not T-processed events. -*/ -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RawEventsQuery { - pub channel: Channel, - pub range: NanoRange, - pub agg_kind: AggKind, - pub disk_io_buffer_size: usize, -} - -#[derive(Serialize, Deserialize)] -pub struct EventQueryJsonStringFrame(String); - -pub async fn x_processed_stream_from_node( - query: RawEventsQuery, - perf_opts: PerfOpts, - node: Node, -) -> Result::Output>> + Send>>, Error> -where - ENP: EventsNodeProcessor, - ::Output: Unpin + 'static, - Result::Output>>, err::Error>: FrameType, -{ - let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; - let qjs = serde_json::to_string(&query)?; - let (netin, mut netout) = net.into_split(); - let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; - netout.write_all(&buf).await?; - let buf = make_term_frame(); - netout.write_all(&buf).await?; - netout.flush().await?; - netout.forget(); - let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let items = EventsFromFrames::new(frames); - Ok(Box::pin(items)) -} diff --git a/disk/src/raw/client.rs b/disk/src/raw/client.rs new file mode 100644 index 0000000..4367356 --- /dev/null +++ b/disk/src/raw/client.rs @@ -0,0 +1,43 @@ +/*! +Delivers event data. + +Delivers event data (not yet time-binned) from local storage and provides client functions +to request such data from nodes. + */ + +use crate::binned::EventsNodeProcessor; +use crate::frame::inmem::InMemoryFrameAsyncReadStream; +use crate::frame::makeframe::{make_frame, make_term_frame}; +use crate::raw::eventsfromframes::EventsFromFrames; +use err::Error; +use futures_core::Stream; +use items::{FrameType, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::query::RawEventsQuery; +use netpod::{EventQueryJsonStringFrame, Node, PerfOpts}; +use std::pin::Pin; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +pub async fn x_processed_stream_from_node( + query: RawEventsQuery, + perf_opts: PerfOpts, + node: Node, +) -> Result::Output>> + Send>>, Error> +where + ENP: EventsNodeProcessor, + ::Output: Unpin + 'static, + Result::Output>>, err::Error>: FrameType, +{ + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_string(&query)?; + let (netin, mut netout) = net.into_split(); + let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; + netout.write_all(&buf).await?; + let buf = make_term_frame(); + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let items = EventsFromFrames::new(frames); + Ok(Box::pin(items)) +} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 435c9a9..93b59b7 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,100 +1,18 @@ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; +use crate::binned::{EventsNodeProcessor, NumOps}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, EventsDecodedStream, LittleEndian, NumFromBytes, }; use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; -use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable}; -use crate::raw::{EventQueryJsonStringFrame, RawEventsQuery}; -use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::log::*; -use netpod::{AggKind, BoolNum, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; +use items::{Framable, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::query::RawEventsQuery; +use netpod::{AggKind, BoolNum, ByteOrder, ByteSize, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; -use std::io; -use std::net::SocketAddr; use std::pin::Pin; -use tokio::io::AsyncWriteExt; -use tokio::net::tcp::OwnedWriteHalf; -use tokio::net::TcpStream; -use tracing::Instrument; - -pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { - let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); - let lis = tokio::net::TcpListener::bind(addr).await?; - loop { - match lis.accept().await { - Ok((stream, addr)) => { - taskrun::spawn(events_conn_handler(stream, addr, node_config.clone())); - } - Err(e) => Err(e)?, - } - } -} - -async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { - //use tracing_futures::Instrument; - let span1 = span!(Level::INFO, "raw::raw_conn_handler"); - let r = events_conn_handler_inner(stream, addr, &node_config) - .instrument(span1) - .await; - match r { - Ok(k) => Ok(k), - Err(e) => { - error!("raw_conn_handler sees error: {:?}", e); - Err(e) - } - } -} - -async fn events_conn_handler_inner( - stream: TcpStream, - addr: SocketAddr, - node_config: &NodeConfigCached, -) -> Result<(), Error> { - match events_conn_handler_inner_try(stream, addr, node_config).await { - Ok(_) => (), - Err(mut ce) => { - error!("events_conn_handler_inner: {:?}", ce.err); - if false { - let buf = make_frame::>, Error>>( - &Err(ce.err), - )?; - match ce.netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => match e.kind() { - io::ErrorKind::BrokenPipe => {} - _ => { - error!("events_conn_handler_inner sees: {:?}", e); - return Err(e)?; - } - }, - } - } - } - } - Ok(()) -} - -struct ConnErr { - err: Error, - netout: OwnedWriteHalf, -} - -impl> From<(E, OwnedWriteHalf)> for ConnErr { - fn from((err, netout): (E, OwnedWriteHalf)) -> Self { - Self { - err: err.into(), - netout, - } - } -} fn make_num_pipeline_stream_evs( event_value_shape: EVS, @@ -206,75 +124,38 @@ macro_rules! pipe1 { }; } -async fn events_conn_handler_inner_try( - stream: TcpStream, - addr: SocketAddr, +pub async fn make_event_pipe( + evq: &RawEventsQuery, node_config: &NodeConfigCached, -) -> Result<(), ConnErr> { - let _ = addr; - let (netin, mut netout) = stream.into_split(); - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let mut frames = vec![]; - while let Some(k) = h - .next() - .instrument(span!(Level::INFO, "raw_conn_handler INPUT STREAM READ")) - .await - { - match k { - Ok(StreamItem::DataItem(item)) => { - frames.push(item); - } - Ok(_) => {} - Err(e) => { - return Err((e, netout))?; - } - } - } - if frames.len() != 1 { - error!("missing command frame"); - return Err((Error::with_msg("missing command frame"), netout))?; - } - let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) { - Ok(k) => k, - Err(e) => return Err((e, netout).into()), - }; - let res: Result = serde_json::from_str(&qitem.0); - let evq = match res { - Ok(k) => k, - Err(e) => { - error!("json parse error: {:?}", e); - return Err((Error::with_msg("json parse error"), netout))?; - } - }; - info!("---------------------------------------------------\nevq {:?}", evq); +) -> Result> + Send>>, Error> { match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), - Err(e) => return Err((e, netout))?, + Err(e) => return Err(e)?, } let range = &evq.range; let channel_config = match read_local_config(&evq.channel, &node_config.node).await { Ok(k) => k, Err(e) => { if e.msg().contains("ErrorKind::NotFound") { - return Ok(()); + let s = futures_util::stream::empty(); + return Ok(Box::pin(s)); } else { - return Err((e, netout))?; + return Err(e)?; } } }; let entry_res = match extract_matching_config_entry(range, &channel_config) { Ok(k) => k, - Err(e) => return Err((e, netout))?, + Err(e) => return Err(e)?, }; let entry = match entry_res { - MatchingConfigEntry::None => return Err((Error::with_msg("no config entry found"), netout))?, - MatchingConfigEntry::Multiple => return Err((Error::with_msg("multiple config entries found"), netout))?, + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, MatchingConfigEntry::Entry(entry) => entry, }; let shape = match entry.to_shape() { Ok(k) => k, - Err(e) => return Err((e, netout))?, + Err(e) => return Err(e)?, }; let channel_config = netpod::ChannelConfig { channel: evq.channel.clone(), @@ -296,28 +177,12 @@ async fn events_conn_handler_inner_try( event_chunker_conf, ); let shape = entry.to_shape().unwrap(); - let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); - while let Some(item) = p1.next().await { - //info!("conn.rs encode frame typeid {:x}", item.typeid()); - let item = item.make_frame(); - match item { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; - } - } - } - let buf = make_term_frame(); - match netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - match netout.flush().await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - Ok(()) + let pipe = pipe1!( + entry.scalar_type, + entry.byte_order, + shape, + evq.agg_kind.clone(), + event_blobs + ); + Ok(pipe) } diff --git a/disk/src/raw/eventsfromframes.rs b/disk/src/raw/eventsfromframes.rs index a13412e..08942fa 100644 --- a/disk/src/raw/eventsfromframes.rs +++ b/disk/src/raw/eventsfromframes.rs @@ -1,9 +1,8 @@ -use crate::agg::streams::StreamItem; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, FrameType}; -use crate::Sitemty; +use crate::frame::makeframe::decode_frame; use futures_core::Stream; use futures_util::StreamExt; +use items::{FrameType, Sitemty, StreamItem}; use netpod::log::*; use serde::de::DeserializeOwned; use std::marker::PhantomData; diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs index 83e36b1..5399715 100644 --- a/disk/src/streamlog.rs +++ b/disk/src/streamlog.rs @@ -1,88 +1,6 @@ +use items::LogItem; use netpod::log::*; -use serde::de::{Error, Visitor}; -use serde::{Deserialize, Serialize}; use std::collections::VecDeque; -use std::fmt::Formatter; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LogItem { - node_ix: u32, - #[serde(with = "levelserde")] - level: Level, - msg: String, -} - -impl LogItem { - pub fn quick(level: Level, msg: String) -> Self { - Self { - level, - msg, - node_ix: 42, - } - } -} - -struct VisitLevel; - -impl<'de> Visitor<'de> for VisitLevel { - type Value = u32; - - fn expecting(&self, fmt: &mut Formatter) -> std::fmt::Result { - write!(fmt, "expect u32 Level code") - } - - fn visit_u32(self, v: u32) -> Result - where - E: Error, - { - Ok(v) - } -} - -mod levelserde { - use super::Level; - use crate::streamlog::VisitLevel; - use serde::{Deserializer, Serializer}; - - pub fn serialize(t: &Level, s: S) -> Result - where - S: Serializer, - { - let g = match *t { - Level::ERROR => 1, - Level::WARN => 2, - Level::INFO => 3, - Level::DEBUG => 4, - Level::TRACE => 5, - }; - s.serialize_u32(g) - } - - pub fn deserialize<'de, D>(d: D) -> Result - where - D: Deserializer<'de>, - { - match d.deserialize_u32(VisitLevel) { - Ok(level) => { - let g = if level == 1 { - Level::ERROR - } else if level == 2 { - Level::WARN - } else if level == 3 { - Level::INFO - } else if level == 4 { - Level::DEBUG - } else if level == 5 { - Level::TRACE - } else { - Level::TRACE - }; - Ok(g) - } - Err(e) => Err(e), - } - } -} pub struct Streamlog { items: VecDeque, diff --git a/err/Cargo.toml b/err/Cargo.toml index 0251dc6..6afa0f8 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } http = "0.2" -tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } backtrace = "0.3.56" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 33b5ccf..d3e6fcb 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -9,7 +9,7 @@ serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } http = "0.2" url = "2.2" -tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } hyper-tls = { version="0.5.0" } bytes = "1.0.1" @@ -24,5 +24,6 @@ dbconn = { path = "../dbconn" } disk = { path = "../disk" } parse = { path = "../parse" } netfetch = { path = "../netfetch" } -archapp = { path = "../archapp" } +archapp_wrap = { path = "../archapp_wrap" } +nodenet = { path = "../nodenet" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 3ac42e9..a631453 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -3,7 +3,6 @@ use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; use disk::binned::query::{BinnedQuery, PreBinnedQuery}; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; -use disk::raw::conn::events_service; use err::Error; use future::Future; use futures_core::Stream; @@ -13,7 +12,11 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, APP_OCTET}; +use netpod::{ + channel_from_pairs, get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, + APP_OCTET, +}; +use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -223,6 +226,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/archapp/channel/info" { + if req.method() == Method::GET { + Ok(archapp_channel_info(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/channel/config" { if req.method() == Method::GET { Ok(channel_config(req, &node_config).await?) @@ -634,7 +643,7 @@ pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); - let res = archapp::parse::scan_files(pairs, node_config.clone()).await?; + let res = archapp_wrap::scan_files(pairs, node_config.clone()).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match k { @@ -655,3 +664,15 @@ pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCach })))?; Ok(ret) } + +pub async fn archapp_channel_info(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let channel = channel_from_pairs(&pairs)?; + let res = archapp_wrap::channel_info(&channel, node_config).await?; + let buf = serde_json::to_vec(&res)?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(buf))?; + Ok(ret) +} diff --git a/items/Cargo.toml b/items/Cargo.toml new file mode 100644 index 0000000..276c22c --- /dev/null +++ b/items/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "items" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +bytes = "1.0.1" +err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/items/src/lib.rs b/items/src/lib.rs new file mode 100644 index 0000000..a70b570 --- /dev/null +++ b/items/src/lib.rs @@ -0,0 +1,315 @@ +use bytes::BytesMut; +use err::Error; +use netpod::{log::Level, BoolNum, EventDataReadStats, EventQueryJsonStringFrame}; +use serde::de::{self, Visitor}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Serialize, Deserialize)] +pub enum RangeCompletableItem { + RangeComplete, + Data(T), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum StatsItem { + EventDataReadStats(EventDataReadStats), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum StreamItem { + DataItem(T), + Log(LogItem), + Stats(StatsItem), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LogItem { + pub node_ix: u32, + #[serde(with = "levelserde")] + pub level: Level, + pub msg: String, +} + +impl LogItem { + pub fn quick(level: Level, msg: String) -> Self { + Self { + level, + msg, + node_ix: 42, + } + } +} + +pub type Sitemty = Result>, Error>; + +struct VisitLevel; + +impl<'de> Visitor<'de> for VisitLevel { + type Value = u32; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "expect u32 Level code") + } + + fn visit_u32(self, v: u32) -> Result + where + E: de::Error, + { + Ok(v) + } +} + +mod levelserde { + use super::Level; + use super::VisitLevel; + use serde::{Deserializer, Serializer}; + + pub fn serialize(t: &Level, s: S) -> Result + where + S: Serializer, + { + let g = match *t { + Level::ERROR => 1, + Level::WARN => 2, + Level::INFO => 3, + Level::DEBUG => 4, + Level::TRACE => 5, + }; + s.serialize_u32(g) + } + + pub fn deserialize<'de, D>(d: D) -> Result + where + D: Deserializer<'de>, + { + match d.deserialize_u32(VisitLevel) { + Ok(level) => { + let g = if level == 1 { + Level::ERROR + } else if level == 2 { + Level::WARN + } else if level == 3 { + Level::INFO + } else if level == 4 { + Level::DEBUG + } else if level == 5 { + Level::TRACE + } else { + Level::TRACE + }; + Ok(g) + } + Err(e) => Err(e), + } + } +} + +pub const INMEM_FRAME_ENCID: u32 = 0x12121212; +pub const INMEM_FRAME_HEAD: usize = 20; +pub const INMEM_FRAME_FOOT: usize = 4; +pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; + +pub trait SubFrId { + const SUB: u32; +} + +impl SubFrId for u8 { + const SUB: u32 = 3; +} + +impl SubFrId for u16 { + const SUB: u32 = 5; +} + +impl SubFrId for u32 { + const SUB: u32 = 8; +} + +impl SubFrId for u64 { + const SUB: u32 = 10; +} + +impl SubFrId for i8 { + const SUB: u32 = 2; +} + +impl SubFrId for i16 { + const SUB: u32 = 4; +} + +impl SubFrId for i32 { + const SUB: u32 = 7; +} + +impl SubFrId for i64 { + const SUB: u32 = 9; +} + +impl SubFrId for f32 { + const SUB: u32 = 11; +} + +impl SubFrId for f64 { + const SUB: u32 = 12; +} + +impl SubFrId for BoolNum { + const SUB: u32 = 13; +} + +pub trait SitemtyFrameType { + const FRAME_TYPE_ID: u32; +} + +pub trait FrameType { + const FRAME_TYPE_ID: u32; +} + +impl FrameType for EventQueryJsonStringFrame { + const FRAME_TYPE_ID: u32 = 0x100; +} + +impl FrameType for Sitemty +where + T: SitemtyFrameType, +{ + const FRAME_TYPE_ID: u32 = T::FRAME_TYPE_ID; +} + +pub trait ProvidesFrameType { + fn frame_type_id(&self) -> u32; +} + +pub trait Framable: Send { + fn typeid(&self) -> u32; + fn make_frame(&self) -> Result; +} + +// TODO need als Framable for those types defined in other crates. +impl Framable for Sitemty +where + T: SitemtyFrameType + Send, +{ + fn typeid(&self) -> u32 { + todo!() + } + + fn make_frame(&self) -> Result { + todo!() + } +} + +/* + +impl Framable for Sitemty { + fn typeid(&self) -> u32 { + EventQueryJsonStringFrame::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + panic!() + } +} + +impl Framable for Result>, Error> { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Result>, Error> { + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Result>>, err::Error> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Result>>, err::Error> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn typeid(&self) -> u32 { + Self::FRAME_TYPE_ID + } + fn make_frame(&self) -> Result { + make_frame(self) + } +} +*/ diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index d6b85de..99a819d 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" -tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tokio-stream = {version = "0.1.5", features = ["fs"]} async-channel = "1.6" bytes = "1.0.1" diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 63f3dfb..e340b31 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" async-channel = "1.6" bytes = "1.0.1" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 42b738c..444484c 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,8 +1,3 @@ -use chrono::{DateTime, TimeZone, Utc}; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::{self, Debug, Display, Formatter}; @@ -13,11 +8,19 @@ use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; use std::time::Duration; -use timeunits::*; + +use chrono::{DateTime, TimeZone, Utc}; +use futures_core::Stream; +use futures_util::StreamExt; +use serde::{Deserialize, Serialize}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; use url::Url; +use err::Error; +use timeunits::*; + +pub mod query; pub mod status; pub mod streamext; @@ -1057,3 +1060,15 @@ impl AppendToUrl for ChannelConfigQuery { g.append_pair("channelName", &self.channel.name); } } + +#[derive(Serialize, Deserialize)] +pub struct EventQueryJsonStringFrame(pub String); + +/** +Provide basic information about a channel, especially it's shape. +*/ +#[derive(Serialize, Deserialize)] +pub struct ChannelInfo { + pub shape: Shape, + pub msg: String, +} diff --git a/netpod/src/query.rs b/netpod/src/query.rs new file mode 100644 index 0000000..9ebb75a --- /dev/null +++ b/netpod/src/query.rs @@ -0,0 +1,13 @@ +use crate::{AggKind, Channel, NanoRange}; +use serde::{Deserialize, Serialize}; + +/** +Query parameters to request (optionally) X-processed, but not T-processed events. +*/ +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RawEventsQuery { + pub channel: Channel, + pub range: NanoRange, + pub agg_kind: AggKind, + pub disk_io_buffer_size: usize, +} diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml new file mode 100644 index 0000000..9f3888c --- /dev/null +++ b/nodenet/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "nodenet" +version = "0.0.1-a.1" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" +chrono = { version = "0.4.19", features = ["serde"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio-stream = {version = "0.1.5", features = ["fs"]} +async-channel = "1.6" +bytes = "1.0.1" +crc32fast = "1.2.1" +arrayref = "0.3.6" +byteorder = "1.4.3" +futures-core = "0.3.14" +futures-util = "0.3.14" +tracing = "0.1.25" +hex = "0.4.3" +err = { path = "../err" } +netpod = { path = "../netpod" } +disk = { path = "../disk" } +archapp_wrap = { path = "../archapp_wrap" } +#parse = { path = "../parse" } +items = { path = "../items" } +taskrun = { path = "../taskrun" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs new file mode 100644 index 0000000..439de4c --- /dev/null +++ b/nodenet/src/conn.rs @@ -0,0 +1,154 @@ +// TODO move these frame-related things out of crate disk. Probably better into `nodenet` +use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::frame::makeframe::{decode_frame, make_term_frame}; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use items::{Framable, StreamItem}; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts}; +use std::net::SocketAddr; +use std::pin::Pin; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp::OwnedWriteHalf; +use tokio::net::TcpStream; +use tracing::Instrument; + +pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { + let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); + let lis = tokio::net::TcpListener::bind(addr).await?; + loop { + match lis.accept().await { + Ok((stream, addr)) => { + taskrun::spawn(events_conn_handler(stream, addr, node_config.clone())); + } + Err(e) => Err(e)?, + } + } +} + +async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { + //use tracing_futures::Instrument; + let span1 = span!(Level::INFO, "raw::raw_conn_handler"); + let r = events_conn_handler_inner(stream, addr, &node_config) + .instrument(span1) + .await; + match r { + Ok(k) => Ok(k), + Err(e) => { + error!("raw_conn_handler sees error: {:?}", e); + Err(e) + } + } +} + +async fn events_conn_handler_inner( + stream: TcpStream, + addr: SocketAddr, + node_config: &NodeConfigCached, +) -> Result<(), Error> { + match events_conn_handler_inner_try(stream, addr, node_config).await { + Ok(_) => (), + Err(ce) => { + error!("events_conn_handler_inner: {:?}", ce.err); + } + } + Ok(()) +} + +struct ConnErr { + err: Error, + #[allow(dead_code)] + netout: OwnedWriteHalf, +} + +impl> From<(E, OwnedWriteHalf)> for ConnErr { + fn from((err, netout): (E, OwnedWriteHalf)) -> Self { + Self { + err: err.into(), + netout, + } + } +} + +async fn events_conn_handler_inner_try( + stream: TcpStream, + addr: SocketAddr, + node_config: &NodeConfigCached, +) -> Result<(), ConnErr> { + let _ = addr; + let (netin, mut netout) = stream.into_split(); + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let mut frames = vec![]; + while let Some(k) = h + .next() + .instrument(span!(Level::INFO, "raw_conn_handler INPUT STREAM READ")) + .await + { + match k { + Ok(StreamItem::DataItem(item)) => { + frames.push(item); + } + Ok(_) => {} + Err(e) => { + return Err((e, netout))?; + } + } + } + if frames.len() != 1 { + error!("missing command frame"); + return Err((Error::with_msg("missing command frame"), netout))?; + } + let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) { + Ok(k) => k, + Err(e) => return Err((e, netout).into()), + }; + let res: Result = serde_json::from_str(&qitem.0); + let evq = match res { + Ok(k) => k, + Err(e) => { + error!("json parse error: {:?}", e); + return Err((Error::with_msg("json parse error"), netout))?; + } + }; + info!("---------------------------------------------------\nevq {:?}", evq); + + let mut p1: Pin> + Send>> = + if let Some(aa) = &node_config.node.archiver_appliance { + match archapp_wrap::make_event_pipe(&evq, aa).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else { + match disk::raw::conn::make_event_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + }; + + while let Some(item) = p1.next().await { + //info!("conn.rs encode frame typeid {:x}", item.typeid()); + let item = item.make_frame(); + match item { + Ok(buf) => match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } + } + } + let buf = make_term_frame(); + match netout.write_all(&buf).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + Ok(()) +} diff --git a/nodenet/src/lib.rs b/nodenet/src/lib.rs new file mode 100644 index 0000000..80ac2a1 --- /dev/null +++ b/nodenet/src/lib.rs @@ -0,0 +1 @@ +pub mod conn; diff --git a/parse/Cargo.toml b/parse/Cargo.toml index ed8bb03..300b425 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } chrono = { version = "0.4.19", features = ["serde"] } bytes = "1.0.1" byteorder = "1.4.3" diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index bf3bbf3..7456bab 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" tracing-subscriber = "0.2.17" backtrace = "0.3.56"