This commit is contained in:
Dominik Werder
2021-11-05 21:22:23 +01:00
parent daf3f6c14c
commit 8c7dbf9ed3
33 changed files with 527 additions and 260 deletions

View File

@@ -28,6 +28,7 @@ netpod = { path = "../netpod" }
dbconn = { path = "../dbconn" }
items = { path = "../items" }
streams = { path = "../streams" }
commonio = { path = "../commonio" }
[features]
default = ["devread"]

View File

@@ -9,189 +9,23 @@ pub mod diskio;
pub mod indexfiles;
pub mod indextree;
pub mod pipe;
pub mod ringbuf;
use self::indexfiles::list_index_files;
use self::indextree::channel_list;
use crate::eventsitem::EventsItem;
use crate::timed::Timed;
use crate::wrap_task;
use async_channel::{Receiver, Sender};
use commonio::StatsChannel;
use err::Error;
use futures_util::StreamExt;
use items::{Sitemty, StatsItem, StreamItem, WithLen};
use items::{StreamItem, WithLen};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{
ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, DiskStats, OpenStats, ReadExactStats, ReadStats,
SeekStats,
};
use netpod::{ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse};
use serde::Serialize;
use std::convert::TryInto;
use std::fmt;
use std::io::{self, SeekFrom};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
/*
struct ReadExactWrap<'a> {
fut: &'a mut dyn Future<Output = io::Result<usize>>,
}
trait TimedIo {
fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap
where
Self: Unpin;
}
impl TimedIo for File {
fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap
where
Self: Unpin,
{
let fut = tokio::io::AsyncReadExt::read_exact(self, buf);
ReadExactWrap { fut: Box::pin(fut) }
}
}
*/
const EPICS_EPOCH_OFFSET: u64 = 631152000 * SEC;
const LOG_IO: bool = true;
const STATS_IO: bool = true;
static CHANNEL_SEND_ERROR: AtomicUsize = AtomicUsize::new(0);
fn channel_send_error() {
let c = CHANNEL_SEND_ERROR.fetch_add(1, Ordering::AcqRel);
if c < 10 {
error!("CHANNEL_SEND_ERROR {}", c);
}
}
pub struct StatsChannel {
chn: Sender<Sitemty<EventsItem>>,
}
impl fmt::Debug for StatsChannel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("StatsChannel").finish()
}
}
impl StatsChannel {
pub fn new(chn: Sender<Sitemty<EventsItem>>) -> Self {
Self { chn }
}
pub fn dummy() -> Self {
let (tx, rx) = async_channel::bounded(2);
taskrun::spawn(async move {
let mut rx = rx;
while let Some(_) = rx.next().await {}
});
Self::new(tx)
}
pub async fn send(&self, item: StatsItem) -> Result<(), Error> {
Ok(self.chn.send(Ok(StreamItem::Stats(item))).await?)
}
}
impl Clone for StatsChannel {
fn clone(&self) -> Self {
Self { chn: self.chn.clone() }
}
}
pub async fn open_read(path: PathBuf, stats: &StatsChannel) -> io::Result<File> {
let ts1 = Instant::now();
let res = OpenOptions::new().read(true).open(path).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed open_read dt: {:.3} ms", dt);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::OpenStats(OpenStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
async fn seek(file: &mut File, pos: SeekFrom, stats: &StatsChannel) -> io::Result<u64> {
let ts1 = Instant::now();
let res = file.seek(pos).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed seek dt: {:.3} ms", dt);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::SeekStats(SeekStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
async fn read(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result<usize> {
let ts1 = Instant::now();
let res = file.read(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed read dt: {:.3} ms res: {:?}", dt, res);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::ReadStats(ReadStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
async fn read_exact(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result<usize> {
let ts1 = Instant::now();
let res = file.read_exact(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed read_exact dt: {:.3} ms res: {:?}", dt, res);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::ReadExactStats(ReadExactStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
};
}
res
}
pub fn name_hash(s: &str, ht_len: u32) -> u32 {
let mut h = 0;
@@ -334,7 +168,6 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
let stream = Box::pin(stream);
let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1);
let mut stream = stream;
let timed_expand = Timed::new("channel_config EXPAND");
while let Some(item) = stream.next().await {
use blockstream::BlockItem::*;
match item {
@@ -346,7 +179,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
}
}
JsVal(jsval) => {
if false {
if true {
info!("jsval: {}", serde_json::to_string(&jsval)?);
}
}
@@ -357,7 +190,6 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
}
}
}
drop(timed_expand);
if type_info.is_none() {
let timed_normal = Timed::new("channel_config NORMAL");
warn!("channel_config expand mode returned none");
@@ -376,7 +208,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
}
}
JsVal(jsval) => {
if false {
if true {
info!("jsval: {}", serde_json::to_string(&jsval)?);
}
}
@@ -406,7 +238,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
mod test {
use crate::archeng::datablock::{read_data_1, read_datafile_header};
use crate::archeng::indextree::{read_channel, read_datablockref, search_record};
use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET};
use crate::archeng::{StatsChannel, EPICS_EPOCH_OFFSET};
use commonio::open_read;
use err::Error;
use netpod::log::*;
use netpod::timeunits::*;

View File

@@ -1,4 +1,4 @@
use crate::archeng::{read, seek, StatsChannel};
use commonio::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::borrow::BorrowMut;

View File

@@ -4,8 +4,8 @@ use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec};
use crate::archeng::indextree::{
read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget,
};
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{open_read, StatsChannel};
use commonio::ringbuf::RingBuf;
use commonio::{open_read, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use items::WithLen;
@@ -82,7 +82,10 @@ impl BlockrefStream {
match self.steps {
Start => {
self.steps = SelectIndexFile;
Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("START"))), self)))
Ok(Some((
BlockrefItem::JsVal(JsVal::String(format!("{} START", module_path!()))),
self,
)))
}
SelectIndexFile => {
let dbc = database_connect(&self.conf.database).await?;

View File

@@ -1,13 +1,13 @@
use super::indextree::DataheaderPos;
use super::ringbuf::RingBuf;
use super::{open_read, StatsChannel};
use crate::archeng::blockrefstream::BlockrefItem;
use crate::archeng::datablock::{read_data2, read_datafile_header2};
use crate::eventsitem::EventsItem;
use crate::archeng::indextree::DataheaderPos;
use commonio::ringbuf::RingBuf;
use commonio::{open_read, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use futures_util::stream::FuturesOrdered;
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::{WithLen, WithTimestamps};
use netpod::{log::*, NanoRange};
use serde::Serialize;

View File

@@ -76,10 +76,11 @@ impl Stream for ChannelNameStream {
Ready(Ok(dbc)) => {
self.connect_fut = None;
let off = self.off as i64;
info!("select channels off {}", off);
let fut = async move {
let rows = dbc
.query(
"select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 1000",
"select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 64",
&[&off],
)
.await?;
@@ -235,7 +236,10 @@ impl Stream for ConfigStream {
match fut.await {
Ok(Ok(k)) => Ok(Res::Response(k)),
Ok(Err(e)) => Err(e),
Err(_) => Ok(Res::TimedOut(q.channel.name)),
Err(_) => {
warn!("timeout");
Ok(Res::TimedOut(q.channel.name))
}
}
};
self.get_fut = Some(Box::pin(fut));

View File

@@ -1,11 +1,11 @@
use super::format_hex_block;
use super::indextree::DataheaderPos;
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{read_exact, read_string, readf64, readu16, readu32, seek, StatsChannel, EPICS_EPOCH_OFFSET};
use crate::eventsitem::EventsItem;
use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use crate::archeng::indextree::DataheaderPos;
use crate::archeng::{format_hex_block, read_string, readf64, readu16, readu32, StatsChannel, EPICS_EPOCH_OFFSET};
use commonio::ringbuf::RingBuf;
use commonio::{read_exact, seek};
use err::Error;
use items::eventsitem::EventsItem;
use items::eventvalues::EventValues;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use items::waveevents::WaveEvents;
use netpod::log::*;
use netpod::timeunits::SEC;

View File

@@ -1,14 +1,14 @@
use crate::archeng::datablock::{read_data_1, read_datafile_header};
use crate::archeng::indexfiles::index_file_path_list;
use crate::archeng::indextree::{read_channel, read_datablockref, search_record, search_record_expand, DataheaderPos};
use crate::archeng::{open_read, StatsChannel};
use crate::eventsitem::EventsItem;
use crate::storagemerge::StorageMerge;
use crate::timed::Timed;
use async_channel::{Receiver, Sender};
use commonio::{open_read, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use futures_util::{FutureExt, StreamExt};
use items::eventsitem::EventsItem;
use items::{inspect_timestamps, RangeCompletableItem, Sitemty, StreamItem, WithLen};
use netpod::log::*;
use netpod::{Channel, NanoRange};
@@ -317,10 +317,10 @@ impl Stream for DatablockStream {
#[cfg(test)]
mod test {
use super::DatablockStream;
use crate::eventsitem::EventsItem;
use chrono::{DateTime, Utc};
use err::Error;
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::{LogItem, Sitemty, StatsItem, StreamItem};
use netpod::timeunits::SEC;
use netpod::{log::*, RangeFilterStats};

View File

@@ -1,7 +1,7 @@
use crate::archeng::{open_read, read, StatsChannel};
use crate::timed::Timed;
use crate::wrap_task;
use async_channel::Receiver;
use commonio::{open_read, read, StatsChannel};
use err::Error;
use futures_core::{Future, Stream};
use futures_util::stream::unfold;

View File

@@ -1,7 +1,6 @@
use crate::archeng::ringbuf::RingBuf;
use crate::archeng::{
format_hex_block, name_hash, open_read, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET,
};
use crate::archeng::{format_hex_block, name_hash, readu16, readu32, readu64, StatsChannel, EPICS_EPOCH_OFFSET};
use commonio::open_read;
use commonio::ringbuf::RingBuf;
use err::Error;
use netpod::{log::*, NanoRange};
use netpod::{timeunits::SEC, FilePos, Nanos};
@@ -1048,7 +1047,8 @@ mod test {
use crate::archeng::indextree::{
read_channel, read_datablockref, read_file_basics, search_record, IndexFileBasics,
};
use crate::archeng::{open_read, StatsChannel, EPICS_EPOCH_OFFSET};
use crate::archeng::EPICS_EPOCH_OFFSET;
use commonio::{open_read, StatsChannel};
use err::Error;
#[allow(unused)]
use netpod::log::*;

View File

@@ -1,15 +1,15 @@
use crate::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents};
use crate::eventsitem::EventsItem;
use crate::generated::EPICSEvent::PayloadType;
use crate::parse::multi::parse_all_ts;
use crate::parse::PbFileReader;
use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use crate::storagemerge::StorageMerge;
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents};
use items::eventsitem::EventsItem;
use items::eventvalues::EventValues;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use items::waveevents::WaveEvents;
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::xbinnedwaveevents::XBinnedWaveEvents;

View File

@@ -3,14 +3,11 @@ pub mod generated;
#[cfg(not(feature = "devread"))]
pub mod generated {}
pub mod archeng;
pub mod binnedevents;
pub mod events;
pub mod eventsitem;
#[cfg(feature = "devread")]
pub mod parse;
#[cfg(not(feature = "devread"))]
pub mod parsestub;
pub mod plainevents;
pub mod storagemerge;
#[cfg(feature = "devread")]
#[cfg(test)]

View File

@@ -1,15 +1,15 @@
pub mod multi;
use crate::events::parse_data_filename;
use crate::eventsitem::EventsItem;
use crate::generated::EPICSEvent::PayloadType;
use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use crate::unescape_archapp_msg;
use archapp_xc::*;
use async_channel::{bounded, Receiver};
use chrono::{TimeZone, Utc};
use err::Error;
use items::eventsitem::EventsItem;
use items::eventvalues::EventValues;
use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use items::waveevents::WaveEvents;
use netpod::log::*;
use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};

View File

@@ -1,6 +1,7 @@
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::{
inspect_timestamps, Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem,
};
@@ -11,8 +12,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::eventsitem::EventsItem;
/**
Priority-Merge events from different candidate sources.

27
commonio/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "commonio"
version = "0.0.1-a.dev.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[lib]
path = "src/commonio.rs"
[dependencies]
tokio = { version = "1.7.1", features = ["io-util", "net", "time", "sync", "fs", "parking_lot"] }
tracing = "0.1.26"
futures-core = "0.3.15"
futures-util = "0.3.15"
bytes = "1.0.1"
serde = "1.0.126"
serde_derive = "1.0.126"
serde_json = "1.0.64"
bincode = "1.3.3"
chrono = "0.4.19"
async-channel = "1.6"
parking_lot = "0.11.2"
crc32fast = "1.2.1"
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
items = { path = "../items" }

173
commonio/src/commonio.rs Normal file
View File

@@ -0,0 +1,173 @@
pub mod ringbuf;
use async_channel::Sender;
use err::Error;
use futures_util::StreamExt;
use items::eventsitem::EventsItem;
use items::{Sitemty, StatsItem, StreamItem};
use netpod::log::*;
use netpod::{DiskStats, OpenStats, ReadExactStats, ReadStats, SeekStats};
use std::fmt;
use std::io::{self, SeekFrom};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
const LOG_IO: bool = true;
const STATS_IO: bool = true;
pub struct StatsChannel {
chn: Sender<Sitemty<EventsItem>>,
}
impl fmt::Debug for StatsChannel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("StatsChannel").finish()
}
}
impl StatsChannel {
pub fn new(chn: Sender<Sitemty<EventsItem>>) -> Self {
Self { chn }
}
pub fn dummy() -> Self {
let (tx, rx) = async_channel::bounded(2);
taskrun::spawn(async move {
let mut rx = rx;
while let Some(_) = rx.next().await {}
});
Self::new(tx)
}
pub async fn send(&self, item: StatsItem) -> Result<(), Error> {
Ok(self.chn.send(Ok(StreamItem::Stats(item))).await?)
}
}
impl Clone for StatsChannel {
fn clone(&self) -> Self {
Self { chn: self.chn.clone() }
}
}
/*
struct ReadExactWrap<'a> {
fut: &'a mut dyn Future<Output = io::Result<usize>>,
}
trait TimedIo {
fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap
where
Self: Unpin;
}
impl TimedIo for File {
fn read_exact<'a, F>(&'a mut self, buf: &'a mut [u8]) -> ReadExactWrap
where
Self: Unpin,
{
let fut = tokio::io::AsyncReadExt::read_exact(self, buf);
ReadExactWrap { fut: Box::pin(fut) }
}
}
*/
static CHANNEL_SEND_ERROR: AtomicUsize = AtomicUsize::new(0);
fn channel_send_error() {
let c = CHANNEL_SEND_ERROR.fetch_add(1, Ordering::AcqRel);
if c < 10 {
error!("CHANNEL_SEND_ERROR {}", c);
}
}
pub async fn open_read(path: PathBuf, stats: &StatsChannel) -> io::Result<File> {
let ts1 = Instant::now();
let res = OpenOptions::new().read(true).open(path).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed open_read dt: {:.3} ms", dt);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::OpenStats(OpenStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
pub async fn seek(file: &mut File, pos: SeekFrom, stats: &StatsChannel) -> io::Result<u64> {
let ts1 = Instant::now();
let res = file.seek(pos).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed seek dt: {:.3} ms", dt);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::SeekStats(SeekStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
pub async fn read(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result<usize> {
let ts1 = Instant::now();
let res = file.read(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed read dt: {:.3} ms res: {:?}", dt, res);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::ReadStats(ReadStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
}
}
res
}
pub async fn read_exact(file: &mut File, buf: &mut [u8], stats: &StatsChannel) -> io::Result<usize> {
let ts1 = Instant::now();
let res = file.read_exact(buf).await;
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
if LOG_IO {
let dt = dt.as_secs_f64() * 1e3;
debug!("timed read_exact dt: {:.3} ms res: {:?}", dt, res);
}
if STATS_IO {
if let Err(_) = stats
.send(StatsItem::DiskStats(DiskStats::ReadExactStats(ReadExactStats::new(
ts2.duration_since(ts1),
))))
.await
{
channel_send_error();
};
}
res
}

View File

@@ -1,4 +1,4 @@
use crate::archeng::{read, seek, StatsChannel};
use crate::{read, seek, StatsChannel};
use err::Error;
use netpod::log::*;
use std::fmt;

View File

@@ -6,6 +6,7 @@ use http::header::InvalidHeaderValue;
use http::uri::InvalidUri;
use nom::error::ErrorKind;
use serde::{Deserialize, Serialize};
use std::array::TryFromSliceError;
use std::fmt::Debug;
use std::net::AddrParseError;
use std::num::{ParseFloatError, ParseIntError};
@@ -268,6 +269,12 @@ impl From<url::ParseError> for Error {
}
}
impl From<TryFromSliceError> for Error {
fn from(k: TryFromSliceError) -> Self {
Self::with_msg(format!("{:?}", k))
}
}
pub fn todo() {
todo!("TODO");
}

View File

@@ -1,5 +1,7 @@
use err::Error;
use items::plainevents::PlainEvents;
use netpod::log::*;
use netpod::Channel;
#[allow(unused)]
use std::os::unix::prelude::OpenOptionsExt;
use std::os::unix::prelude::{AsRawFd, OsStrExt};
@@ -173,3 +175,11 @@ mod test {
Ok(taskrun::run(write_1()).unwrap())
}
}
pub struct EventSink {}
impl EventSink {
pub fn sink(&self, _channel: &Channel, _events: PlainEvents) -> Result<(), Error> {
Ok(())
}
}

View File

@@ -4,6 +4,9 @@ version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[lib]
path = "src/httpret.rs"
[dependencies]
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -279,10 +279,11 @@ impl BlockRefStream {
name: channel_name,
//name: "ARIDI-PCT:CURRENT".into(),
};
let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range, conf.clone());
use archapp_wrap::archapp::archeng;
let s = archeng::blockrefstream::blockref_stream(channel, range, conf.clone());
let s = s.map(|item| match item {
Ok(item) => {
use archapp_wrap::archapp::archeng::blockrefstream::BlockrefItem::*;
use archeng::blockrefstream::BlockrefItem::*;
match item {
Blockref(_k, jsval) => Ok(jsval),
JsVal(jsval) => Ok(jsval),

View File

@@ -2,15 +2,15 @@ use crate::response;
use err::Error;
use http::header;
use hyper::{Body, Request, Response, StatusCode};
use netpod::{log::*, APP_JSON};
use netpod::{ChannelSearchQuery, NodeConfigCached};
use netpod::log::*;
use netpod::{ChannelSearchQuery, NodeConfigCached, ACCEPT_ALL, APP_JSON};
use url::Url;
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let vdef = header::HeaderValue::from_static(APP_JSON);
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
if v == APP_JSON || v == "*/*" {
if v == APP_JSON || v == ACCEPT_ALL {
let s1 = format!("dummy:{}", head.uri);
info!("try to parse {:?}", s1);
let url = Url::parse(&s1)?;

View File

@@ -1,7 +1,6 @@
use items::{
xbinnedscalarevents::XBinnedScalarEvents, xbinnedwaveevents::XBinnedWaveEvents, Appendable, Clearable,
PushableIndex, WithLen, WithTimestamps,
};
use crate::xbinnedscalarevents::XBinnedScalarEvents;
use crate::xbinnedwaveevents::XBinnedWaveEvents;
use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
use crate::{

View File

@@ -1,11 +1,8 @@
use items::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps};
use crate::binnedevents::XBinnedEvents;
use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents};
use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
use crate::{
binnedevents::XBinnedEvents,
plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents},
};
#[derive(Debug)]
pub enum EventsItem {
Plain(PlainEvents),

View File

@@ -1,3 +1,18 @@
pub mod binnedevents;
pub mod eventsitem;
pub mod eventvalues;
pub mod frame;
pub mod inmem;
pub mod minmaxavgbins;
pub mod minmaxavgdim1bins;
pub mod minmaxavgwavebins;
pub mod numops;
pub mod plainevents;
pub mod streams;
pub mod waveevents;
pub mod xbinnedscalarevents;
pub mod xbinnedwaveevents;
use crate::frame::make_frame_2;
use crate::numops::BoolNum;
use bytes::BytesMut;
@@ -16,18 +31,6 @@ use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};
pub mod eventvalues;
pub mod frame;
pub mod inmem;
pub mod minmaxavgbins;
pub mod minmaxavgdim1bins;
pub mod minmaxavgwavebins;
pub mod numops;
pub mod streams;
pub mod waveevents;
pub mod xbinnedscalarevents;
pub mod xbinnedwaveevents;
pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100;
pub const EVENT_VALUES_FRAME_TYPE_ID: u32 = 0x500;
pub const MIN_MAX_AVG_BINS: u32 = 0x700;

View File

@@ -1,9 +1,9 @@
use crate::binnedevents::{SingleBinWaveEvents, XBinnedEvents};
use crate::eventsitem::EventsItem;
use crate::eventvalues::EventValues;
use crate::waveevents::{WaveEvents, WaveXBinner};
use crate::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps};
use err::Error;
use items::eventvalues::EventValues;
use items::waveevents::{WaveEvents, WaveXBinner};
use items::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
#[derive(Debug)]

View File

@@ -2,7 +2,10 @@
name = "netfetch"
version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
edition = "2021"
[lib]
path = "src/netfetch.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }

100
netfetch/src/bsread.rs Normal file
View File

@@ -0,0 +1,100 @@
use crate::zmtp::ZmtpMessage;
use err::Error;
#[allow(unused)]
use netpod::log::*;
use netpod::ByteOrder;
use netpod::ScalarType;
use netpod::Shape;
use serde::Deserialize;
use serde_json::Value as JsVal;
use std::fmt;
// TODO
pub struct ParseError {
pub err: Error,
pub msg: ZmtpMessage,
}
#[derive(Debug, Deserialize)]
pub struct GlobalTimestamp {
sec: u64,
ns: u64,
}
#[derive(Debug, Deserialize)]
pub struct ChannelDesc {
name: String,
#[serde(rename = "type")]
ty: String,
shape: JsVal,
encoding: String,
}
#[derive(Debug, Deserialize)]
pub struct HeadA {
htype: String,
hash: String,
pulse_id: serde_json::Number,
global_timestamp: GlobalTimestamp,
}
#[derive(Debug, Deserialize)]
pub struct HeadB {
htype: String,
channels: Vec<ChannelDesc>,
}
#[derive(Debug)]
pub struct BsreadMessage {
head_a: HeadA,
head_b: HeadB,
values: Vec<Box<dyn fmt::Debug>>,
}
pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 3 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
}
let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?;
let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?;
let mut values = vec![];
if msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
let shape = Shape::from_bsread_jsval(&ch.shape)?;
match sty {
ScalarType::I64 => match &bo {
ByteOrder::LE => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let v = i64::from_le_bytes(fr.data().try_into()?);
values.push(Box::new(v) as _);
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
},
_ => {}
},
_ => {}
}
}
}
{
let fr = &msg.frames()[msg.frames().len() - 1];
if fr.data().len() == 8 {
let pulse = u64::from_le_bytes(fr.data().try_into()?);
info!("pulse {}", pulse);
}
}
let ret = BsreadMessage { head_a, head_b, values };
Ok(ret)
}
pub struct BsreadCollector {}
impl BsreadCollector {
pub fn new<S: Into<String>>(_addr: S) -> Self {
err::todoval()
}
}

View File

@@ -1,3 +1,4 @@
pub mod bsread;
pub mod ca;
#[cfg(test)]
pub mod test;

View File

@@ -9,6 +9,18 @@ use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use crate::bsread::parse_zmtp_message;
#[test]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
let fut = async move {
let _ = tokio::time::timeout(Duration::from_millis(16000), zmtp_client("camtest:9999")).await;
Ok::<_, Error>(())
};
taskrun::run(fut)
}
pub async fn zmtp_00() -> Result<(), Error> {
let addr = "S10-CPPM-MOT0991:9999";
zmtp_client(addr).await?;
@@ -18,8 +30,32 @@ pub async fn zmtp_00() -> Result<(), Error> {
pub async fn zmtp_client(addr: &str) -> Result<(), Error> {
let conn = tokio::net::TcpStream::connect(addr).await?;
let mut zmtp = Zmtp::new(conn);
while let Some(ev) = zmtp.next().await {
info!("got zmtp event: {:?}", ev);
let mut i1 = 0;
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpMessage(msg) => {
info!("Message frames: {}", msg.frames.len());
match parse_zmtp_message(&msg) {
Ok(msg) => info!("{:?}", msg),
Err(e) => {
error!("{}", e);
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
}
}
}
},
Err(e) => {
error!("{}", e);
return Err(e);
}
}
i1 += 1;
if i1 > 100 {
break;
}
}
Ok(())
}
@@ -36,6 +72,7 @@ enum ConnState {
struct Zmtp {
done: bool,
complete: bool,
conn: TcpStream,
conn_state: ConnState,
buf: NetBuf,
@@ -55,6 +92,7 @@ impl Zmtp {
//info!("recv_buffer_size {:8}", conn.recv_buffer_size()?);
Self {
done: false,
complete: false,
conn,
conn_state: ConnState::InitSend,
buf: NetBuf::new(),
@@ -238,30 +276,46 @@ impl NetBuf {
}
#[derive(Debug)]
struct ZmtpMessage {
pub struct ZmtpMessage {
frames: Vec<ZmtpFrame>,
}
struct ZmtpFrame {
impl ZmtpMessage {
pub fn frames(&self) -> &Vec<ZmtpFrame> {
&self.frames
}
}
pub struct ZmtpFrame {
msglen: usize,
has_more: bool,
is_command: bool,
data: Vec<u8>,
}
impl ZmtpFrame {
pub fn data(&self) -> &[u8] {
&self.data
}
}
impl fmt::Debug for ZmtpFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s = String::from_utf8(self.data.clone()).unwrap_or_else(|_| String::new());
let s = if s.is_ascii() && !s.contains("\x00") {
s
} else {
"...".into()
let data = match String::from_utf8(self.data.clone()) {
Ok(s) => s
.chars()
.filter(|x| {
//
x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace()
})
.collect::<String>(),
Err(_) => format!("Binary {{ len: {} }}", self.data.len()),
};
f.debug_struct("ZmtpFrame")
.field("msglen", &self.msglen)
.field("has_more", &self.has_more)
.field("is_command", &self.is_command)
.field("data", &s)
.field("data", &data)
.finish()
}
}
@@ -276,7 +330,10 @@ impl Stream for Zmtp {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.done {
if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
return Ready(None);
}
'outer: loop {

View File

@@ -4,6 +4,9 @@ version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[lib]
path = "src/netpod.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -4,6 +4,12 @@ pub mod query;
pub mod status;
pub mod streamext;
use chrono::{DateTime, TimeZone, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
use std::collections::BTreeMap;
use std::fmt;
use std::iter::FromIterator;
@@ -12,21 +18,15 @@ use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::Duration;
use chrono::{DateTime, TimeZone, Utc};
use futures_core::Stream;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use timeunits::*;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
use url::Url;
use err::Error;
use timeunits::*;
pub const APP_JSON: &'static str = "application/json";
pub const APP_JSON_LINES: &'static str = "application/jsonlines";
pub const APP_OCTET: &'static str = "application/octet-stream";
pub const ACCEPT_ALL: &'static str = "*/*";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel {
@@ -83,6 +83,24 @@ impl ScalarType {
Ok(g)
}
pub fn from_bsread_str(s: &str) -> Result<Self, Error> {
use ScalarType::*;
let ret = match s {
"uint8" => U8,
"uint16" => U16,
"uint32" => U32,
"uint64" => U64,
"int8" => I8,
"int16" => I16,
"int32" => I32,
"int64" => I64,
"float" => F32,
"double" => F64,
_ => return Err(Error::with_msg_no_trace(format!("can not understand bsread {}", s))),
};
Ok(ret)
}
pub fn bytes(&self) -> u8 {
use ScalarType::*;
match self {
@@ -356,6 +374,14 @@ impl ByteOrder {
}
}
pub fn from_bsread_str(s: &str) -> Result<ByteOrder, Error> {
match s {
"little" => Ok(ByteOrder::LE),
"big" => Ok(ByteOrder::BE),
_ => Err(Error::with_msg_no_trace(format!("can not understand {}", s))),
}
}
pub fn is_le(&self) -> bool {
if let Self::LE = self {
true
@@ -399,6 +425,26 @@ pub enum Shape {
Image(u32, u32),
}
impl Shape {
pub fn from_bsread_jsval(v: &JsVal) -> Result<Shape, Error> {
match v {
JsVal::Array(v) => match v.len() {
0 => Ok(Shape::Scalar),
1 => match &v[0] {
JsVal::Number(v) => match v.as_u64() {
Some(0) | Some(1) => Ok(Shape::Scalar),
Some(v) => Ok(Shape::Wave(v as u32)),
None => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))),
},
_ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))),
},
_ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))),
},
_ => Err(Error::with_msg_no_trace(format!("can not understand {:?}", v))),
}
}
}
pub trait HasShape {
fn shape(&self) -> Shape;
}