From 8c7dbf9ed312879eefc9dde04a72253b2195d995 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 5 Nov 2021 21:22:23 +0100 Subject: [PATCH] Refactor --- archapp/Cargo.toml | 1 + archapp/src/archeng.rs | 181 +----------------- archapp/src/archeng/backreadbuf.rs | 2 +- archapp/src/archeng/blockrefstream.rs | 9 +- archapp/src/archeng/blockstream.rs | 8 +- archapp/src/archeng/configs.rs | 8 +- archapp/src/archeng/datablock.rs | 12 +- archapp/src/archeng/datablockstream.rs | 6 +- archapp/src/archeng/indexfiles.rs | 2 +- archapp/src/archeng/indextree.rs | 10 +- archapp/src/events.rs | 6 +- archapp/src/lib.rs | 3 - archapp/src/parse.rs | 4 +- archapp/src/storagemerge.rs | 3 +- commonio/Cargo.toml | 27 +++ commonio/src/commonio.rs | 173 +++++++++++++++++ .../src/archeng => commonio/src}/ringbuf.rs | 2 +- err/src/lib.rs | 7 + fsio/src/fsio.rs | 10 + httpret/Cargo.toml | 3 + httpret/src/channelarchiver.rs | 5 +- httpret/src/{lib.rs => httpret.rs} | 0 httpret/src/search.rs | 6 +- {archapp => items}/src/binnedevents.rs | 7 +- {archapp => items}/src/eventsitem.rs | 9 +- items/src/lib.rs | 27 +-- {archapp => items}/src/plainevents.rs | 6 +- netfetch/Cargo.toml | 5 +- netfetch/src/bsread.rs | 100 ++++++++++ netfetch/src/{lib.rs => netfetch.rs} | 1 + netfetch/src/zmtp.rs | 79 ++++++-- netpod/Cargo.toml | 3 + netpod/src/{lib.rs => netpod.rs} | 62 +++++- 33 files changed, 527 insertions(+), 260 deletions(-) create mode 100644 commonio/Cargo.toml create mode 100644 commonio/src/commonio.rs rename {archapp/src/archeng => commonio/src}/ringbuf.rs (98%) rename httpret/src/{lib.rs => httpret.rs} (100%) rename {archapp => items}/src/binnedevents.rs (98%) rename {archapp => items}/src/eventsitem.rs (96%) rename {archapp => items}/src/plainevents.rs (98%) create mode 100644 netfetch/src/bsread.rs rename netfetch/src/{lib.rs => netfetch.rs} (76%) rename netpod/src/{lib.rs => netpod.rs} (94%) diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index bb66cf3..283c077 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -28,6 +28,7 @@ netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } items = { path = "../items" } streams = { path = "../streams" } +commonio = { path = "../commonio" } [features] default = ["devread"] diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 7970998..32dd6ef 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -9,189 +9,23 @@ pub mod diskio; pub mod indexfiles; pub mod indextree; pub mod pipe; -pub mod ringbuf; use self::indexfiles::list_index_files; use self::indextree::channel_list; -use crate::eventsitem::EventsItem; use crate::timed::Timed; use crate::wrap_task; use async_channel::{Receiver, Sender}; +use commonio::StatsChannel; use err::Error; use futures_util::StreamExt; -use items::{Sitemty, StatsItem, StreamItem, WithLen}; +use items::{StreamItem, WithLen}; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{ - ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DiskStats, OpenStats, ReadExactStats, ReadStats, - SeekStats, -}; +use netpod::{ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse}; use serde::Serialize; use std::convert::TryInto; -use std::fmt; -use std::io::{self, SeekFrom}; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Instant; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; - -/* -struct ReadExactWrap<'a> { - fut: &'a mut dyn Future>, -} - -trait TimedIo { - fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap - where - Self: Unpin; -} - -impl TimedIo for File { - fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap - where - Self: Unpin, - { - let fut = tokio::io::AsyncReadExt::read_exact(self, buf); - ReadExactWrap { fut: Box::pin(fut) } - } -} -*/ const EPICS_EPOCH_OFFSET: u64 = 631152000 * SEC; -const LOG_IO: bool = true; -const STATS_IO: bool = true; -static CHANNEL_SEND_ERROR: AtomicUsize = AtomicUsize::new(0); - -fn channel_send_error() { - let c = CHANNEL_SEND_ERROR.fetch_add(1, Ordering::AcqRel); - if c < 10 { - error!("CHANNEL_SEND_ERROR {}", c); - } -} - -pub struct StatsChannel { - chn: Sender>, -} - -impl fmt::Debug for StatsChannel { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("StatsChannel").finish() - } -} - -impl StatsChannel { - pub fn new(chn: Sender>) -> Self { - Self { chn } - } - - pub fn dummy() -> Self { - let (tx, rx) = async_channel::bounded(2); - taskrun::spawn(async move { - let mut rx = rx; - while let Some(_) = rx.next().await {} - }); - Self::new(tx) - } - - pub async fn send(&self, item: StatsItem) -> Result<(), Error> { - Ok(self.chn.send(Ok(StreamItem::Stats(item))).await?) - } -} - -impl Clone for StatsChannel { - fn clone(&self) -> Self { - Self { chn: self.chn.clone() } - } -} - -pub async fn open_read(path: PathBuf, stats: &StatsChannel) -> io::Result { - let ts1 = Instant::now(); - let res = OpenOptions::new().read(true).open(path).await; - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - if LOG_IO { - let dt = dt.as_secs_f64() * 1e3; - debug!("timed open_read dt: {:.3} ms", dt); - } - if STATS_IO { - if let Err(_) = stats - .send(StatsItem::DiskStats(DiskStats::OpenStats(OpenStats::new( - ts2.duration_since(ts1), - )))) - .await - { - channel_send_error(); - } - } - res -} - -async fn seek(file: &mut File, pos: SeekFrom, stats: &StatsChannel) -> io::Result { - let ts1 = Instant::now(); - let res = file.seek(pos).await; - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - if LOG_IO { - let dt = dt.as_secs_f64() * 1e3; - debug!("timed seek dt: {:.3} ms", dt); - } - if STATS_IO { - if let Err(_) = stats - .send(StatsItem::DiskStats(DiskStats::SeekStats(SeekStats::new( - ts2.duration_since(ts1), - )))) - .await - { - channel_send_error(); - } - } - res -} - -async fn read(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result { - let ts1 = Instant::now(); - let res = file.read(buf).await; - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - if LOG_IO { - let dt = dt.as_secs_f64() * 1e3; - debug!("timed read dt: {:.3} ms res: {:?}", dt, res); - } - if STATS_IO { - if let Err(_) = stats - .send(StatsItem::DiskStats(DiskStats::ReadStats(ReadStats::new( - ts2.duration_since(ts1), - )))) - .await - { - channel_send_error(); - } - } - res -} - -async fn read_exact(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result { - let ts1 = Instant::now(); - let res = file.read_exact(buf).await; - let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1); - if LOG_IO { - let dt = dt.as_secs_f64() * 1e3; - debug!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); - } - if STATS_IO { - if let Err(_) = stats - .send(StatsItem::DiskStats(DiskStats::ReadExactStats(ReadExactStats::new( - ts2.duration_since(ts1), - )))) - .await - { - channel_send_error(); - }; - } - res -} pub fn name_hash(s: &str, ht_len: u32) -> u32 { let mut h = 0; @@ -334,7 +168,6 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R let stream = Box::pin(stream); let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1); let mut stream = stream; - let timed_expand = Timed::new("channel_config EXPAND"); while let Some(item) = stream.next().await { use blockstream::BlockItem::*; match item { @@ -346,7 +179,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } JsVal(jsval) => { - if false { + if true { info!("jsval: {}", serde_json::to_string(&jsval)?); } } @@ -357,7 +190,6 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } } - drop(timed_expand); if type_info.is_none() { let timed_normal = Timed::new("channel_config NORMAL"); warn!("channel_config expand mode returned none"); @@ -376,7 +208,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R } } JsVal(jsval) => { - if false { + if true { info!("jsval: {}", serde_json::to_string(&jsval)?); } } @@ -406,7 +238,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R mod test { use crate::archeng::datablock::{read_data_1, read_datafile_header}; use crate::archeng::indextree::{read_channel, read_datablockref, search_record}; - use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET}; + use crate::archeng::{StatsChannel, EPICS_EPOCH_OFFSET}; + use commonio::open_read; use err::Error; use netpod::log::*; use netpod::timeunits::*; diff --git a/archapp/src/archeng/backreadbuf.rs b/archapp/src/archeng/backreadbuf.rs index a17cab6..27fcab5 100644 --- a/archapp/src/archeng/backreadbuf.rs +++ b/archapp/src/archeng/backreadbuf.rs @@ -1,4 +1,4 @@ -use crate::archeng::{read, seek, StatsChannel}; +use commonio::{read, seek, StatsChannel}; use err::Error; use netpod::log::*; use std::borrow::BorrowMut; diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index 1a970e4..f778361 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -4,8 +4,8 @@ use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; use crate::archeng::indextree::{ read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, }; -use crate::archeng::ringbuf::RingBuf; -use crate::archeng::{open_read, StatsChannel}; +use commonio::ringbuf::RingBuf; +use commonio::{open_read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; use items::WithLen; @@ -82,7 +82,10 @@ impl BlockrefStream { match self.steps { Start => { self.steps = SelectIndexFile; - Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("START"))), self))) + Ok(Some(( + BlockrefItem::JsVal(JsVal::String(format!("{} START", module_path!()))), + self, + ))) } SelectIndexFile => { let dbc = database_connect(&self.conf.database).await?; diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index a07e1ed..f5ef95d 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -1,13 +1,13 @@ -use super::indextree::DataheaderPos; -use super::ringbuf::RingBuf; -use super::{open_read, StatsChannel}; use crate::archeng::blockrefstream::BlockrefItem; use crate::archeng::datablock::{read_data2, read_datafile_header2}; -use crate::eventsitem::EventsItem; +use crate::archeng::indextree::DataheaderPos; +use commonio::ringbuf::RingBuf; +use commonio::{open_read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; use futures_util::stream::FuturesOrdered; use futures_util::StreamExt; +use items::eventsitem::EventsItem; use items::{WithLen, WithTimestamps}; use netpod::{log::*, NanoRange}; use serde::Serialize; diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index 345520f..30d0e2f 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -76,10 +76,11 @@ impl Stream for ChannelNameStream { Ready(Ok(dbc)) => { self.connect_fut = None; let off = self.off as i64; + info!("select channels off {}", off); let fut = async move { let rows = dbc .query( - "select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 1000", + "select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 64", &[&off], ) .await?; @@ -235,7 +236,10 @@ impl Stream for ConfigStream { match fut.await { Ok(Ok(k)) => Ok(Res::Response(k)), Ok(Err(e)) => Err(e), - Err(_) => Ok(Res::TimedOut(q.channel.name)), + Err(_) => { + warn!("timeout"); + Ok(Res::TimedOut(q.channel.name)) + } } }; self.get_fut = Some(Box::pin(fut)); diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index 2009bc5..b99a012 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -1,11 +1,11 @@ -use super::format_hex_block; -use super::indextree::DataheaderPos; -use crate::archeng::ringbuf::RingBuf; -use crate::archeng::{read_exact, read_string, readf64, readu16, readu32, seek, StatsChannel, EPICS_EPOCH_OFFSET}; -use crate::eventsitem::EventsItem; -use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; +use crate::archeng::indextree::DataheaderPos; +use crate::archeng::{format_hex_block, read_string, readf64, readu16, readu32, StatsChannel, EPICS_EPOCH_OFFSET}; +use commonio::ringbuf::RingBuf; +use commonio::{read_exact, seek}; use err::Error; +use items::eventsitem::EventsItem; use items::eventvalues::EventValues; +use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use items::waveevents::WaveEvents; use netpod::log::*; use netpod::timeunits::SEC; diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs index 1343770..01511ec 100644 --- a/archapp/src/archeng/datablockstream.rs +++ b/archapp/src/archeng/datablockstream.rs @@ -1,14 +1,14 @@ use crate::archeng::datablock::{read_data_1, read_datafile_header}; use crate::archeng::indexfiles::index_file_path_list; use crate::archeng::indextree::{read_channel, read_datablockref, search_record, search_record_expand, DataheaderPos}; -use crate::archeng::{open_read, StatsChannel}; -use crate::eventsitem::EventsItem; use crate::storagemerge::StorageMerge; use crate::timed::Timed; use async_channel::{Receiver, Sender}; +use commonio::{open_read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; use futures_util::{FutureExt, StreamExt}; +use items::eventsitem::EventsItem; use items::{inspect_timestamps, RangeCompletableItem, Sitemty, StreamItem, WithLen}; use netpod::log::*; use netpod::{Channel, NanoRange}; @@ -317,10 +317,10 @@ impl Stream for DatablockStream { #[cfg(test)] mod test { use super::DatablockStream; - use crate::eventsitem::EventsItem; use chrono::{DateTime, Utc}; use err::Error; use futures_util::StreamExt; + use items::eventsitem::EventsItem; use items::{LogItem, Sitemty, StatsItem, StreamItem}; use netpod::timeunits::SEC; use netpod::{log::*, RangeFilterStats}; diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index f95357f..bdf1fd2 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -1,7 +1,7 @@ -use crate::archeng::{open_read, read, StatsChannel}; use crate::timed::Timed; use crate::wrap_task; use async_channel::Receiver; +use commonio::{open_read, read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; use futures_util::stream::unfold; diff --git a/archapp/src/archeng/indextree.rs b/archapp/src/archeng/indextree.rs index 52c5ec1..97f3f50 100644 --- a/archapp/src/archeng/indextree.rs +++ b/archapp/src/archeng/indextree.rs @@ -1,7 +1,6 @@ -use crate::archeng::ringbuf::RingBuf; -use crate::archeng::{ - format_hex_block, name_hash, open_read, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET, -}; +use crate::archeng::{format_hex_block, name_hash, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET}; +use commonio::open_read; +use commonio::ringbuf::RingBuf; use err::Error; use netpod::{log::*, NanoRange}; use netpod::{timeunits::SEC, FilePos, Nanos}; @@ -1048,7 +1047,8 @@ mod test { use crate::archeng::indextree::{ read_channel, read_datablockref, read_file_basics, search_record, IndexFileBasics, }; - use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET}; + use crate::archeng::EPICS_EPOCH_OFFSET; + use commonio::{open_read, StatsChannel}; use err::Error; #[allow(unused)] use netpod::log::*; diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 3888aba..23d1efd 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,15 +1,15 @@ -use crate::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents}; -use crate::eventsitem::EventsItem; use crate::generated::EPICSEvent::PayloadType; use crate::parse::multi::parse_all_ts; use crate::parse::PbFileReader; -use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use crate::storagemerge::StorageMerge; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents}; +use items::eventsitem::EventsItem; use items::eventvalues::EventValues; +use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use items::waveevents::WaveEvents; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 2d62444..4db8bec 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -3,14 +3,11 @@ pub mod generated; #[cfg(not(feature = "devread"))] pub mod generated {} pub mod archeng; -pub mod binnedevents; pub mod events; -pub mod eventsitem; #[cfg(feature = "devread")] pub mod parse; #[cfg(not(feature = "devread"))] pub mod parsestub; -pub mod plainevents; pub mod storagemerge; #[cfg(feature = "devread")] #[cfg(test)] diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 4b4eef2..6e652a5 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,15 +1,15 @@ pub mod multi; use crate::events::parse_data_filename; -use crate::eventsitem::EventsItem; use crate::generated::EPICSEvent::PayloadType; -use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use crate::unescape_archapp_msg; use archapp_xc::*; use async_channel::{bounded, Receiver}; use chrono::{TimeZone, Utc}; use err::Error; +use items::eventsitem::EventsItem; use items::eventvalues::EventValues; +use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use items::waveevents::WaveEvents; use netpod::log::*; use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; diff --git a/archapp/src/storagemerge.rs b/archapp/src/storagemerge.rs index 27eb8ba..036c035 100644 --- a/archapp/src/storagemerge.rs +++ b/archapp/src/storagemerge.rs @@ -1,6 +1,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::eventsitem::EventsItem; use items::{ inspect_timestamps, Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, }; @@ -11,8 +12,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::eventsitem::EventsItem; - /** Priority-Merge events from different candidate sources. diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml new file mode 100644 index 0000000..12402b7 --- /dev/null +++ b/commonio/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "commonio" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[lib] +path = "src/commonio.rs" + +[dependencies] +tokio = { version = "1.7.1", features = ["io-util", "net", "time", "sync", "fs", "parking_lot"] } +tracing = "0.1.26" +futures-core = "0.3.15" +futures-util = "0.3.15" +bytes = "1.0.1" +serde = "1.0.126" +serde_derive = "1.0.126" +serde_json = "1.0.64" +bincode = "1.3.3" +chrono = "0.4.19" +async-channel = "1.6" +parking_lot = "0.11.2" +crc32fast = "1.2.1" +err = { path = "../err" } +taskrun = { path = "../taskrun" } +netpod = { path = "../netpod" } +items = { path = "../items" } diff --git a/commonio/src/commonio.rs b/commonio/src/commonio.rs new file mode 100644 index 0000000..8a53a9a --- /dev/null +++ b/commonio/src/commonio.rs @@ -0,0 +1,173 @@ +pub mod ringbuf; + +use async_channel::Sender; +use err::Error; +use futures_util::StreamExt; +use items::eventsitem::EventsItem; +use items::{Sitemty, StatsItem, StreamItem}; +use netpod::log::*; +use netpod::{DiskStats, OpenStats, ReadExactStats, ReadStats, SeekStats}; +use std::fmt; +use std::io::{self, SeekFrom}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Instant; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; + +const LOG_IO: bool = true; +const STATS_IO: bool = true; + +pub struct StatsChannel { + chn: Sender>, +} + +impl fmt::Debug for StatsChannel { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("StatsChannel").finish() + } +} + +impl StatsChannel { + pub fn new(chn: Sender>) -> Self { + Self { chn } + } + + pub fn dummy() -> Self { + let (tx, rx) = async_channel::bounded(2); + taskrun::spawn(async move { + let mut rx = rx; + while let Some(_) = rx.next().await {} + }); + Self::new(tx) + } + + pub async fn send(&self, item: StatsItem) -> Result<(), Error> { + Ok(self.chn.send(Ok(StreamItem::Stats(item))).await?) + } +} + +impl Clone for StatsChannel { + fn clone(&self) -> Self { + Self { chn: self.chn.clone() } + } +} + +/* +struct ReadExactWrap<'a> { + fut: &'a mut dyn Future>, +} + +trait TimedIo { + fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap + where + Self: Unpin; +} + +impl TimedIo for File { + fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap + where + Self: Unpin, + { + let fut = tokio::io::AsyncReadExt::read_exact(self, buf); + ReadExactWrap { fut: Box::pin(fut) } + } +} +*/ + +static CHANNEL_SEND_ERROR: AtomicUsize = AtomicUsize::new(0); + +fn channel_send_error() { + let c = CHANNEL_SEND_ERROR.fetch_add(1, Ordering::AcqRel); + if c < 10 { + error!("CHANNEL_SEND_ERROR {}", c); + } +} + +pub async fn open_read(path: PathBuf, stats: &StatsChannel) -> io::Result { + let ts1 = Instant::now(); + let res = OpenOptions::new().read(true).open(path).await; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + if LOG_IO { + let dt = dt.as_secs_f64() * 1e3; + debug!("timed open_read dt: {:.3} ms", dt); + } + if STATS_IO { + if let Err(_) = stats + .send(StatsItem::DiskStats(DiskStats::OpenStats(OpenStats::new( + ts2.duration_since(ts1), + )))) + .await + { + channel_send_error(); + } + } + res +} + +pub async fn seek(file: &mut File, pos: SeekFrom, stats: &StatsChannel) -> io::Result { + let ts1 = Instant::now(); + let res = file.seek(pos).await; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + if LOG_IO { + let dt = dt.as_secs_f64() * 1e3; + debug!("timed seek dt: {:.3} ms", dt); + } + if STATS_IO { + if let Err(_) = stats + .send(StatsItem::DiskStats(DiskStats::SeekStats(SeekStats::new( + ts2.duration_since(ts1), + )))) + .await + { + channel_send_error(); + } + } + res +} + +pub async fn read(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result { + let ts1 = Instant::now(); + let res = file.read(buf).await; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + if LOG_IO { + let dt = dt.as_secs_f64() * 1e3; + debug!("timed read dt: {:.3} ms res: {:?}", dt, res); + } + if STATS_IO { + if let Err(_) = stats + .send(StatsItem::DiskStats(DiskStats::ReadStats(ReadStats::new( + ts2.duration_since(ts1), + )))) + .await + { + channel_send_error(); + } + } + res +} + +pub async fn read_exact(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result { + let ts1 = Instant::now(); + let res = file.read_exact(buf).await; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + if LOG_IO { + let dt = dt.as_secs_f64() * 1e3; + debug!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); + } + if STATS_IO { + if let Err(_) = stats + .send(StatsItem::DiskStats(DiskStats::ReadExactStats(ReadExactStats::new( + ts2.duration_since(ts1), + )))) + .await + { + channel_send_error(); + }; + } + res +} diff --git a/archapp/src/archeng/ringbuf.rs b/commonio/src/ringbuf.rs similarity index 98% rename from archapp/src/archeng/ringbuf.rs rename to commonio/src/ringbuf.rs index 7ba24c5..40df4ad 100644 --- a/archapp/src/archeng/ringbuf.rs +++ b/commonio/src/ringbuf.rs @@ -1,4 +1,4 @@ -use crate::archeng::{read, seek, StatsChannel}; +use crate::{read, seek, StatsChannel}; use err::Error; use netpod::log::*; use std::fmt; diff --git a/err/src/lib.rs b/err/src/lib.rs index 0345465..34c4ce1 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -6,6 +6,7 @@ use http::header::InvalidHeaderValue; use http::uri::InvalidUri; use nom::error::ErrorKind; use serde::{Deserialize, Serialize}; +use std::array::TryFromSliceError; use std::fmt::Debug; use std::net::AddrParseError; use std::num::{ParseFloatError, ParseIntError}; @@ -268,6 +269,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: TryFromSliceError) -> Self { + Self::with_msg(format!("{:?}", k)) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/fsio/src/fsio.rs b/fsio/src/fsio.rs index 4ebb94e..a5bc2b8 100644 --- a/fsio/src/fsio.rs +++ b/fsio/src/fsio.rs @@ -1,5 +1,7 @@ use err::Error; +use items::plainevents::PlainEvents; use netpod::log::*; +use netpod::Channel; #[allow(unused)] use std::os::unix::prelude::OpenOptionsExt; use std::os::unix::prelude::{AsRawFd, OsStrExt}; @@ -173,3 +175,11 @@ mod test { Ok(taskrun::run(write_1()).unwrap()) } } + +pub struct EventSink {} + +impl EventSink { + pub fn sink(&self, _channel: &Channel, _events: PlainEvents) -> Result<(), Error> { + Ok(()) + } +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 672a276..0136a5c 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1-a.0" authors = ["Dominik Werder "] edition = "2018" +[lib] +path = "src/httpret.rs" + [dependencies] serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index a194e3c..0a7e730 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -279,10 +279,11 @@ impl BlockRefStream { name: channel_name, //name: "ARIDI-PCT:CURRENT".into(), }; - let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range, conf.clone()); + use archapp_wrap::archapp::archeng; + let s = archeng::blockrefstream::blockref_stream(channel, range, conf.clone()); let s = s.map(|item| match item { Ok(item) => { - use archapp_wrap::archapp::archeng::blockrefstream::BlockrefItem::*; + use archeng::blockrefstream::BlockrefItem::*; match item { Blockref(_k, jsval) => Ok(jsval), JsVal(jsval) => Ok(jsval), diff --git a/httpret/src/lib.rs b/httpret/src/httpret.rs similarity index 100% rename from httpret/src/lib.rs rename to httpret/src/httpret.rs diff --git a/httpret/src/search.rs b/httpret/src/search.rs index e1c000c..f719b72 100644 --- a/httpret/src/search.rs +++ b/httpret/src/search.rs @@ -2,15 +2,15 @@ use crate::response; use err::Error; use http::header; use hyper::{Body, Request, Response, StatusCode}; -use netpod::{log::*, APP_JSON}; -use netpod::{ChannelSearchQuery, NodeConfigCached}; +use netpod::log::*; +use netpod::{ChannelSearchQuery, NodeConfigCached, ACCEPT_ALL, APP_JSON}; use url::Url; pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let vdef = header::HeaderValue::from_static(APP_JSON); let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); - if v == APP_JSON || v == "*/*" { + if v == APP_JSON || v == ACCEPT_ALL { let s1 = format!("dummy:{}", head.uri); info!("try to parse {:?}", s1); let url = Url::parse(&s1)?; diff --git a/archapp/src/binnedevents.rs b/items/src/binnedevents.rs similarity index 98% rename from archapp/src/binnedevents.rs rename to items/src/binnedevents.rs index 596098a..b56cccb 100644 --- a/archapp/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -1,7 +1,6 @@ -use items::{ - xbinnedscalarevents::XBinnedScalarEvents, xbinnedwaveevents::XBinnedWaveEvents, Appendable, Clearable, - PushableIndex, WithLen, WithTimestamps, -}; +use crate::xbinnedscalarevents::XBinnedScalarEvents; +use crate::xbinnedwaveevents::XBinnedWaveEvents; +use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; use crate::{ diff --git a/archapp/src/eventsitem.rs b/items/src/eventsitem.rs similarity index 96% rename from archapp/src/eventsitem.rs rename to items/src/eventsitem.rs index f7d1aa0..b4e1308 100644 --- a/archapp/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -1,11 +1,8 @@ -use items::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; +use crate::binnedevents::XBinnedEvents; +use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; +use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; -use crate::{ - binnedevents::XBinnedEvents, - plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}, -}; - #[derive(Debug)] pub enum EventsItem { Plain(PlainEvents), diff --git a/items/src/lib.rs b/items/src/lib.rs index 09a1e05..e304334 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -1,3 +1,18 @@ +pub mod binnedevents; +pub mod eventsitem; +pub mod eventvalues; +pub mod frame; +pub mod inmem; +pub mod minmaxavgbins; +pub mod minmaxavgdim1bins; +pub mod minmaxavgwavebins; +pub mod numops; +pub mod plainevents; +pub mod streams; +pub mod waveevents; +pub mod xbinnedscalarevents; +pub mod xbinnedwaveevents; + use crate::frame::make_frame_2; use crate::numops::BoolNum; use bytes::BytesMut; @@ -16,18 +31,6 @@ use std::task::{Context, Poll}; use tokio::fs::File; use tokio::io::{AsyncRead, ReadBuf}; -pub mod eventvalues; -pub mod frame; -pub mod inmem; -pub mod minmaxavgbins; -pub mod minmaxavgdim1bins; -pub mod minmaxavgwavebins; -pub mod numops; -pub mod streams; -pub mod waveevents; -pub mod xbinnedscalarevents; -pub mod xbinnedwaveevents; - pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; pub const EVENT_VALUES_FRAME_TYPE_ID: u32 = 0x500; pub const MIN_MAX_AVG_BINS: u32 = 0x700; diff --git a/archapp/src/plainevents.rs b/items/src/plainevents.rs similarity index 98% rename from archapp/src/plainevents.rs rename to items/src/plainevents.rs index 34e6605..df832df 100644 --- a/archapp/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -1,9 +1,9 @@ use crate::binnedevents::{SingleBinWaveEvents, XBinnedEvents}; use crate::eventsitem::EventsItem; +use crate::eventvalues::EventValues; +use crate::waveevents::{WaveEvents, WaveXBinner}; +use crate::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps}; use err::Error; -use items::eventvalues::EventValues; -use items::waveevents::{WaveEvents, WaveXBinner}; -use items::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; #[derive(Debug)] diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 99a819d..5abd881 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -2,7 +2,10 @@ name = "netfetch" version = "0.0.1-a.0" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" + +[lib] +path = "src/netfetch.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs new file mode 100644 index 0000000..59f5081 --- /dev/null +++ b/netfetch/src/bsread.rs @@ -0,0 +1,100 @@ +use crate::zmtp::ZmtpMessage; +use err::Error; +#[allow(unused)] +use netpod::log::*; +use netpod::ByteOrder; +use netpod::ScalarType; +use netpod::Shape; +use serde::Deserialize; +use serde_json::Value as JsVal; +use std::fmt; + +// TODO +pub struct ParseError { + pub err: Error, + pub msg: ZmtpMessage, +} + +#[derive(Debug, Deserialize)] +pub struct GlobalTimestamp { + sec: u64, + ns: u64, +} + +#[derive(Debug, Deserialize)] +pub struct ChannelDesc { + name: String, + #[serde(rename = "type")] + ty: String, + shape: JsVal, + encoding: String, +} + +#[derive(Debug, Deserialize)] +pub struct HeadA { + htype: String, + hash: String, + pulse_id: serde_json::Number, + global_timestamp: GlobalTimestamp, +} + +#[derive(Debug, Deserialize)] +pub struct HeadB { + htype: String, + channels: Vec, +} + +#[derive(Debug)] +pub struct BsreadMessage { + head_a: HeadA, + head_b: HeadB, + values: Vec>, +} + +pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { + if msg.frames().len() < 3 { + return Err(Error::with_msg_no_trace("not enough frames for bsread")); + } + let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?; + let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?; + let mut values = vec![]; + if msg.frames().len() == head_b.channels.len() + 3 { + for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) { + let sty = ScalarType::from_bsread_str(ch.ty.as_str())?; + let bo = ByteOrder::from_bsread_str(&ch.encoding)?; + let shape = Shape::from_bsread_jsval(&ch.shape)?; + match sty { + ScalarType::I64 => match &bo { + ByteOrder::LE => match &shape { + Shape::Scalar => { + assert_eq!(fr.data().len(), 8); + let v = i64::from_le_bytes(fr.data().try_into()?); + values.push(Box::new(v) as _); + } + Shape::Wave(_) => {} + Shape::Image(_, _) => {} + }, + _ => {} + }, + _ => {} + } + } + } + { + let fr = &msg.frames()[msg.frames().len() - 1]; + if fr.data().len() == 8 { + let pulse = u64::from_le_bytes(fr.data().try_into()?); + info!("pulse {}", pulse); + } + } + let ret = BsreadMessage { head_a, head_b, values }; + Ok(ret) +} + +pub struct BsreadCollector {} + +impl BsreadCollector { + pub fn new>(_addr: S) -> Self { + err::todoval() + } +} diff --git a/netfetch/src/lib.rs b/netfetch/src/netfetch.rs similarity index 76% rename from netfetch/src/lib.rs rename to netfetch/src/netfetch.rs index d37227b..92a851a 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/netfetch.rs @@ -1,3 +1,4 @@ +pub mod bsread; pub mod ca; #[cfg(test)] pub mod test; diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 548d2c8..08e548d 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -9,6 +9,18 @@ use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; +use crate::bsread::parse_zmtp_message; + +#[test] +fn test_listen() -> Result<(), Error> { + use std::time::Duration; + let fut = async move { + let _ = tokio::time::timeout(Duration::from_millis(16000), zmtp_client("camtest:9999")).await; + Ok::<_, Error>(()) + }; + taskrun::run(fut) +} + pub async fn zmtp_00() -> Result<(), Error> { let addr = "S10-CPPM-MOT0991:9999"; zmtp_client(addr).await?; @@ -18,8 +30,32 @@ pub async fn zmtp_00() -> Result<(), Error> { pub async fn zmtp_client(addr: &str) -> Result<(), Error> { let conn = tokio::net::TcpStream::connect(addr).await?; let mut zmtp = Zmtp::new(conn); - while let Some(ev) = zmtp.next().await { - info!("got zmtp event: {:?}", ev); + let mut i1 = 0; + while let Some(item) = zmtp.next().await { + match item { + Ok(ev) => match ev { + ZmtpEvent::ZmtpMessage(msg) => { + info!("Message frames: {}", msg.frames.len()); + match parse_zmtp_message(&msg) { + Ok(msg) => info!("{:?}", msg), + Err(e) => { + error!("{}", e); + for frame in &msg.frames { + info!("Frame: {:?}", frame); + } + } + } + } + }, + Err(e) => { + error!("{}", e); + return Err(e); + } + } + i1 += 1; + if i1 > 100 { + break; + } } Ok(()) } @@ -36,6 +72,7 @@ enum ConnState { struct Zmtp { done: bool, + complete: bool, conn: TcpStream, conn_state: ConnState, buf: NetBuf, @@ -55,6 +92,7 @@ impl Zmtp { //info!("recv_buffer_size {:8}", conn.recv_buffer_size()?); Self { done: false, + complete: false, conn, conn_state: ConnState::InitSend, buf: NetBuf::new(), @@ -238,30 +276,46 @@ impl NetBuf { } #[derive(Debug)] -struct ZmtpMessage { +pub struct ZmtpMessage { frames: Vec, } -struct ZmtpFrame { +impl ZmtpMessage { + pub fn frames(&self) -> &Vec { + &self.frames + } +} + +pub struct ZmtpFrame { msglen: usize, has_more: bool, is_command: bool, data: Vec, } +impl ZmtpFrame { + pub fn data(&self) -> &[u8] { + &self.data + } +} + impl fmt::Debug for ZmtpFrame { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let s = String::from_utf8(self.data.clone()).unwrap_or_else(|_| String::new()); - let s = if s.is_ascii() && !s.contains("\x00") { - s - } else { - "...".into() + let data = match String::from_utf8(self.data.clone()) { + Ok(s) => s + .chars() + .filter(|x| { + // + x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace() + }) + .collect::(), + Err(_) => format!("Binary {{ len: {} }}", self.data.len()), }; f.debug_struct("ZmtpFrame") .field("msglen", &self.msglen) .field("has_more", &self.has_more) .field("is_command", &self.is_command) - .field("data", &s) + .field("data", &data) .finish() } } @@ -276,7 +330,10 @@ impl Stream for Zmtp { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.done { + if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; return Ready(None); } 'outer: loop { diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index e340b31..885714d 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -4,6 +4,9 @@ version = "0.0.1-a.0" authors = ["Dominik Werder "] edition = "2018" +[lib] +path = "src/netpod.rs" + [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/netpod/src/lib.rs b/netpod/src/netpod.rs similarity index 94% rename from netpod/src/lib.rs rename to netpod/src/netpod.rs index aef1eff..cfa69a4 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/netpod.rs @@ -4,6 +4,12 @@ pub mod query; pub mod status; pub mod streamext; +use chrono::{DateTime, TimeZone, Utc}; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsVal; use std::collections::BTreeMap; use std::fmt; use std::iter::FromIterator; @@ -12,21 +18,15 @@ use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; use std::time::Duration; - -use chrono::{DateTime, TimeZone, Utc}; -use futures_core::Stream; -use futures_util::StreamExt; -use serde::{Deserialize, Serialize}; +use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; use url::Url; -use err::Error; -use timeunits::*; - pub const APP_JSON: &'static str = "application/json"; pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_OCTET: &'static str = "application/octet-stream"; +pub const ACCEPT_ALL: &'static str = "*/*"; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { @@ -83,6 +83,24 @@ impl ScalarType { Ok(g) } + pub fn from_bsread_str(s: &str) -> Result { + use ScalarType::*; + let ret = match s { + "uint8" => U8, + "uint16" => U16, + "uint32" => U32, + "uint64" => U64, + "int8" => I8, + "int16" => I16, + "int32" => I32, + "int64" => I64, + "float" => F32, + "double" => F64, + _ => return Err(Error::with_msg_no_trace(format!("can not understand bsread {}", s))), + }; + Ok(ret) + } + pub fn bytes(&self) -> u8 { use ScalarType::*; match self { @@ -356,6 +374,14 @@ impl ByteOrder { } } + pub fn from_bsread_str(s: &str) -> Result { + match s { + "little" => Ok(ByteOrder::LE), + "big" => Ok(ByteOrder::BE), + _ => Err(Error::with_msg_no_trace(format!("can not understand {}", s))), + } + } + pub fn is_le(&self) -> bool { if let Self::LE = self { true @@ -399,6 +425,26 @@ pub enum Shape { Image(u32, u32), } +impl Shape { + pub fn from_bsread_jsval(v: &JsVal) -> Result { + match v { + JsVal::Array(v) => match v.len() { + 0 => Ok(Shape::Scalar), + 1 => match &v[0] { + JsVal::Number(v) => match v.as_u64() { + Some(0) | Some(1) => Ok(Shape::Scalar), + Some(v) => Ok(Shape::Wave(v as u32)), + None => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + }, + _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + }, + _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + }, + _ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))), + } + } +} + pub trait HasShape { fn shape(&self) -> Shape; }