From 96e047339204bdc6c45801893549eddedc56d33c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 3 Nov 2021 16:00:53 +0100 Subject: [PATCH] Improve search api --- Cargo.toml | 2 +- archapp/src/archeng.rs | 59 ++--- archapp/src/archeng/backreadbuf.rs | 2 +- archapp/src/archeng/blockrefstream.rs | 24 +- archapp/src/archeng/blockstream.rs | 63 +++-- archapp/src/archeng/configs.rs | 259 +++++++++++++++++++++ archapp/src/archeng/datablock.rs | 310 ++++++++++++++++++++----- archapp/src/archeng/indexfiles.rs | 2 - archapp/src/archeng/ringbuf.rs | 2 +- dbconn/src/search.rs | 103 +++++++- fsio/Cargo.toml | 37 +++ fsio/src/fsio.rs | 175 ++++++++++++++ httpret/src/api1.rs | 8 +- httpret/src/channelarchiver.rs | 110 ++++++++- httpret/src/lib.rs | 4 + httpret/src/proxy.rs | 5 +- httpret/src/proxy/api4.rs | 89 +++++++ httpret/src/search.rs | 27 +-- httpret/static/documentation/api4.html | 263 +++++++++++---------- netpod/src/lib.rs | 14 +- taskrun/src/lib.rs | 6 +- 21 files changed, 1285 insertions(+), 279 deletions(-) create mode 100644 archapp/src/archeng/configs.rs create mode 100644 fsio/Cargo.toml create mode 100644 fsio/src/fsio.rs create mode 100644 httpret/src/proxy/api4.rs diff --git a/Cargo.toml b/Cargo.toml index 81c534e..e0e512f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient"] +members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio"] [profile.release] opt-level = 1 diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index c02a708..b814ee0 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -2,6 +2,7 @@ pub mod backreadbuf; pub mod blockrefstream; pub mod blockstream; pub mod bufminread; +pub mod configs; pub mod datablock; pub mod datablockstream; pub mod diskio; @@ -18,7 +19,7 @@ use crate::wrap_task; use async_channel::{Receiver, Sender}; use err::Error; use futures_util::StreamExt; -use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem}; +use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ @@ -329,26 +330,26 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver Result { let _timed = Timed::new("channel_config"); let mut type_info = None; - let mut stream = datablockstream::DatablockStream::for_channel_range( - q.range.clone(), - q.channel.clone(), - conf.data_base_paths.clone().into(), - true, - 1, - ); + let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone()); + let stream = Box::pin(stream); + let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1); + let mut stream = stream; let timed_expand = Timed::new("channel_config EXPAND"); while let Some(item) = stream.next().await { + use blockstream::BlockItem::*; match item { Ok(k) => match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => (), - RangeCompletableItem::Data(k) => { - type_info = Some(k.type_info()); + EventsItem(item) => { + if item.len() > 0 { + type_info = Some(item.type_info()); break; } - }, - StreamItem::Log(_) => (), - StreamItem::Stats(_) => (), + } + JsVal(jsval) => { + if false { + info!("jsval: {}", serde_json::to_string(&jsval)?); + } + } }, Err(e) => { error!("{}", e); @@ -360,25 +361,25 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R if type_info.is_none() { let timed_normal = Timed::new("channel_config NORMAL"); warn!("channel_config expand mode returned none"); - let mut stream = datablockstream::DatablockStream::for_channel_range( - q.range.clone(), - q.channel.clone(), - conf.data_base_paths.clone().into(), - false, - u64::MAX, - ); + let stream = blockrefstream::blockref_stream(q.channel.clone(), q.range.clone().clone(), conf.clone()); + let stream = Box::pin(stream); + let stream = blockstream::BlockStream::new(stream, q.range.clone(), 1); + let mut stream = stream; while let Some(item) = stream.next().await { + use blockstream::BlockItem::*; match item { Ok(k) => match k { - StreamItem::DataItem(k) => match k { - RangeCompletableItem::RangeComplete => (), - RangeCompletableItem::Data(k) => { - type_info = Some(k.type_info()); + EventsItem(item) => { + if item.len() > 0 { + type_info = Some(item.type_info()); break; } - }, - StreamItem::Log(_) => (), - StreamItem::Stats(_) => (), + } + JsVal(jsval) => { + if false { + info!("jsval: {}", serde_json::to_string(&jsval)?); + } + } }, Err(e) => { error!("{}", e); diff --git a/archapp/src/archeng/backreadbuf.rs b/archapp/src/archeng/backreadbuf.rs index 27292ae..8dd2ea4 100644 --- a/archapp/src/archeng/backreadbuf.rs +++ b/archapp/src/archeng/backreadbuf.rs @@ -123,6 +123,6 @@ impl fmt::Debug for BackReadBuf { impl Drop for BackReadBuf { fn drop(&mut self) { - info!("Drop {:?}", self); + trace!("Drop {:?}", self); } } diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index 99cbaa7..5629931 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -1,22 +1,21 @@ use crate::archeng::backreadbuf::BackReadBuf; -use crate::archeng::datablock::{read_data2, read_data_1, read_datafile_header, read_datafile_header2}; +use crate::archeng::datablock::{read_data2, read_datafile_header2}; use crate::archeng::indexfiles::{database_connect, unfold_stream, UnfoldExec}; use crate::archeng::indextree::{ - read_datablockref, read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, - RecordTarget, + read_datablockref2, DataheaderPos, Dataref, HeaderVersion, IndexFileBasics, RecordIter, RecordTarget, }; use crate::archeng::ringbuf::RingBuf; -use crate::archeng::{open_read, seek, StatsChannel}; +use crate::archeng::{open_read, StatsChannel}; use err::Error; use futures_core::{Future, Stream}; use items::WithLen; #[allow(unused)] use netpod::log::*; -use netpod::{Channel, ChannelArchiver, FilePos, NanoRange}; +use netpod::{Channel, ChannelArchiver, NanoRange}; +#[allow(unused)] use serde::Serialize; use serde_json::Value as JsVal; use std::collections::{BTreeMap, VecDeque}; -use std::io::SeekFrom; use std::path::PathBuf; use std::pin::Pin; use tokio::fs::File; @@ -83,7 +82,7 @@ impl BlockrefStream { match self.steps { Start => { self.steps = SelectIndexFile; - Ok(Some((BlockrefItem::JsVal(JsVal::Null), self))) + Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("START"))), self))) } SelectIndexFile => { let dbc = database_connect(&self.conf.database).await?; @@ -93,7 +92,7 @@ impl BlockrefStream { self.paths.push_back(row.try_get(0)?); } self.steps = SetupNextPath; - Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("INIT"))), self))) + Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DBQUERY"))), self))) } SetupNextPath => { let stats = &StatsChannel::dummy(); @@ -115,8 +114,11 @@ impl BlockrefStream { }; Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("NEXTPATH"))), self))) } else { - self.steps = Done; - Ok(Some((BlockrefItem::JsVal(JsVal::String(format!("DONE"))), self))) + self.steps = SelectIndexFile; + Ok(Some(( + BlockrefItem::JsVal(JsVal::String(format!("PATHQUEUEEMPTY"))), + self, + ))) } } ReadBlocks(ref mut iter, ref hver, ref indexpath) => { @@ -208,7 +210,7 @@ impl BlockrefStream { panic!(); } } else { - info!( + debug!( "data_bytes_read: {} same_dfh_count: {}", self.data_bytes_read, self.same_dfh_count ); diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index 08b59be..a07e1ed 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -58,6 +58,7 @@ struct Reader { impl Reader {} struct FutAItem { + #[allow(unused)] fname: String, path: PathBuf, dfnotfound: bool, @@ -67,6 +68,7 @@ struct FutAItem { events: Option, } +#[allow(unused)] pub struct FutA { fname: String, pos: DataheaderPos, @@ -76,13 +78,17 @@ pub struct FutA { impl Future for FutA { type Output = Result; + #[allow(unused)] fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; err::todoval() } } -pub enum BlockItem {} +pub enum BlockItem { + EventsItem(EventsItem), + JsVal(JsVal), +} pub struct BlockStream { inp: S, @@ -115,7 +121,7 @@ impl BlockStream { range, dfnotfound: BTreeMap::new(), block_reads: FuturesOrdered::new(), - max_reads, + max_reads: max_reads.max(1), readers: VecDeque::new(), last_dfname: String::new(), last_dfhpos: DataheaderPos(u64::MAX), @@ -143,7 +149,7 @@ impl Stream for BlockStream where S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -195,7 +201,7 @@ where Some(reader) } else { let stats = StatsChannel::dummy(); - info!("open new reader file {:?}", dpath); + debug!("open new reader file {:?}", dpath); match open_read(dpath.clone(), &stats).await { Ok(file) => { // @@ -212,9 +218,17 @@ where let rp1 = reader.rb.bytes_read(); let dfheader = read_datafile_header2(&mut reader.rb, pos).await?; + // TODO handle expand + let expand = false; let data = - read_data2(&mut reader.rb, &dfheader, range, false) - .await?; + read_data2(&mut reader.rb, &dfheader, range, expand) + .await + .map_err(|e| { + Error::with_msg_no_trace(format!( + "dpath {:?} error {}", + dpath, e + )) + })?; let rp2 = reader.rb.bytes_read(); let bytes_read = rp2 - rp1; let ret = FutAItem { @@ -248,7 +262,7 @@ where } Int::Empty } - BlockrefItem::JsVal(_jsval) => Int::Empty, + BlockrefItem::JsVal(jsval) => Int::Item(Ok(BlockItem::JsVal(jsval))), }, Err(e) => { self.done = true; @@ -271,7 +285,6 @@ where } else { match self.block_reads.poll_next_unpin(cx) { Ready(Some(Ok(item))) => { - // if item.dfnotfound { self.dfnotfound.insert(item.path, true); } @@ -297,23 +310,35 @@ where item.events.is_some(), item.events_read )); + let _ = item; } - if self.acc.older(Duration::from_millis(1000)) { - let ret = std::mem::replace(&mut self.acc, StatsAcc::new()); - match serde_json::to_value((ret, self.block_reads.len(), self.readers.len())) { - Ok(item) => Int::Item(Ok(item)), - Err(e) => { - self.done = true; - return Ready(Some(Err(e.into()))); + if false { + // TODO emit proper variant for optional performance measurement. + if self.acc.older(Duration::from_millis(1000)) { + let ret = std::mem::replace(&mut self.acc, StatsAcc::new()); + match serde_json::to_value((ret, self.block_reads.len(), self.readers.len())) { + Ok(item) => Int::Item(Ok::<_, Error>(item)), + Err(e) => { + self.done = true; + return Ready(Some(Err(e.into()))); + } } - } + } else { + //Int::Item(Ok(item)) + Int::Empty + }; + err::todoval() } else { - //Int::Item(Ok(item)) - Int::Empty + if let Some(events) = item.events { + Int::Item(Ok(BlockItem::EventsItem(events))) + } else { + Int::Empty + } } } Ready(Some(Err(e))) => { self.done = true; + error!("{}", e); Int::Item(Err(e)) } Ready(None) => { @@ -364,6 +389,6 @@ impl fmt::Debug for BlockStream { impl Drop for BlockStream { fn drop(&mut self) { - info!("Drop {:?}", self); + trace!("Drop {:?}", self); } } diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs new file mode 100644 index 0000000..345520f --- /dev/null +++ b/archapp/src/archeng/configs.rs @@ -0,0 +1,259 @@ +use crate::archeng::indexfiles::database_connect; +use err::Error; +use futures_core::{Future, Stream}; +use futures_util::{FutureExt, StreamExt}; +use netpod::log::*; +use netpod::{Channel, ChannelArchiver, ChannelConfigQuery, ChannelConfigResponse, Database, NanoRange}; +use serde::Serialize; +use serde_json::Value as JsVal; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, SystemTime}; +use tokio_postgres::{Client, Row}; + +pub struct ChannelNameStream { + db_config: Database, + off: u64, + db_done: bool, + batch: VecDeque, + connect_fut: Option> + Send>>>, + select_fut: Option, Error>> + Send>>>, + done: bool, + complete: bool, +} + +impl ChannelNameStream { + pub fn new(db_config: Database) -> Self { + Self { + db_config, + off: 0, + db_done: false, + batch: VecDeque::new(), + connect_fut: None, + select_fut: None, + done: false, + complete: false, + } + } +} + +impl Stream for ChannelNameStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if let Some(item) = self.batch.pop_front() { + Ready(Some(Ok(item))) + } else if let Some(fut) = &mut self.select_fut { + match fut.poll_unpin(cx) { + Ready(Ok(rows)) => { + self.select_fut = None; + self.off += rows.len() as u64; + if rows.len() == 0 { + self.db_done = true; + } + for row in rows { + self.batch.push_back(row.get(1)); + } + continue; + } + Ready(Err(e)) => { + self.select_fut = None; + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else if let Some(fut) = &mut self.connect_fut { + match fut.poll_unpin(cx) { + Ready(Ok(dbc)) => { + self.connect_fut = None; + let off = self.off as i64; + let fut = async move { + let rows = dbc + .query( + "select rowid, name from channels where config = '{}'::jsonb order by name offset $1 limit 1000", + &[&off], + ) + .await?; + Ok::<_, Error>(rows) + }; + self.select_fut = Some(Box::pin(fut)); + continue; + } + Ready(Err(e)) => { + self.connect_fut = None; + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else { + if self.db_done { + self.done = true; + info!("db_done"); + continue; + } else { + let db = self.db_config.clone(); + let fut = async move { database_connect(&db).await }; + self.connect_fut = Some(Box::pin(fut)); + continue; + } + }; + } + } +} + +enum Res { + TimedOut(String), + Response(ChannelConfigResponse), +} + +#[derive(Debug, Serialize)] +pub enum ConfigItem { + Config(ChannelConfigResponse), + JsVal(JsVal), +} + +pub struct ConfigStream { + conf: ChannelArchiver, + inp: ChannelNameStream, + inp_done: bool, + get_fut: Option> + Send>>>, + update_fut: Option> + Send>>>, + done: bool, + complete: bool, +} + +impl ConfigStream { + pub fn new(inp: ChannelNameStream, conf: ChannelArchiver) -> Self { + Self { + conf, + inp, + inp_done: false, + get_fut: None, + update_fut: None, + done: false, + complete: false, + } + } +} + +impl Stream for ConfigStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if let Some(fut) = &mut self.update_fut { + match fut.poll_unpin(cx) { + Ready(Ok(_)) => { + self.update_fut = None; + continue; + } + Ready(Err(e)) => { + self.update_fut = None; + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else if let Some(fut) = &mut self.get_fut { + match fut.poll_unpin(cx) { + Ready(Ok(Res::Response(item))) => { + self.get_fut = None; + let name = item.channel.name.clone(); + let dbconf = self.conf.database.clone(); + let config = serde_json::to_value(&item)?; + let fut = async move { + let dbc = database_connect(&dbconf).await?; + dbc.query("update channels set config = $2 where name = $1", &[&name, &config]) + .await?; + Ok(()) + }; + self.update_fut = Some(Box::pin(fut)); + let item = ConfigItem::Config(item); + Ready(Some(Ok(item))) + } + Ready(Ok(Res::TimedOut(name))) => { + self.get_fut = None; + let dbconf = self.conf.database.clone(); + let config = serde_json::to_value(&"TimedOut")?; + let fut = async move { + let dbc = database_connect(&dbconf).await?; + dbc.query("update channels set config = $2 where name = $1", &[&name, &config]) + .await?; + Ok(()) + }; + self.update_fut = Some(Box::pin(fut)); + continue; + } + Ready(Err(e)) => { + self.get_fut = None; + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else { + if self.inp_done { + self.done = true; + continue; + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => { + let conf = self.conf.clone(); + let fut = async move { + let channel = Channel { + name: item, + backend: "".into(), + }; + let now = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let beg = now - 60 * 60 * 1000; + let end = now + 60 * 60 * 4; + let q = ChannelConfigQuery { + channel, + range: NanoRange { beg, end }, + }; + let fut = super::channel_config(&q, &conf); + let fut = tokio::time::timeout(Duration::from_millis(2000), fut); + match fut.await { + Ok(Ok(k)) => Ok(Res::Response(k)), + Ok(Err(e)) => Err(e), + Err(_) => Ok(Res::TimedOut(q.channel.name)), + } + }; + self.get_fut = Some(Box::pin(fut)); + continue; + } + Ready(Some(Err(e))) => { + self.done = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.inp_done = true; + info!("ConfigStream input done."); + continue; + } + Pending => Pending, + } + } + }; + } + } +} diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index 6b68699..87bc5cb 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -1,9 +1,12 @@ +use super::format_hex_block; +use super::indextree::DataheaderPos; use crate::archeng::ringbuf::RingBuf; use crate::archeng::{read_exact, read_string, readf64, readu16, readu32, seek, StatsChannel, EPICS_EPOCH_OFFSET}; use crate::eventsitem::EventsItem; -use crate::plainevents::{PlainEvents, ScalarPlainEvents}; +use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; use err::Error; use items::eventvalues::EventValues; +use items::waveevents::WaveEvents; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{NanoRange, Nanos}; @@ -11,13 +14,17 @@ use std::convert::TryInto; use std::io::SeekFrom; use tokio::fs::File; -use super::indextree::DataheaderPos; - -#[derive(Debug)] +#[derive(Clone, Debug)] enum DbrType { DbrString = 0, - DbrInt = 1, + DbrShort = 1, DbrStsFloat = 9, + DbrTimeString = 14, + DbrTimeShort = 15, + DbrTimeFloat = 16, + DbrTimeEnum = 17, + DbrTimeChar = 18, + DbrTimeLong = 19, DbrTimeDouble = 20, } @@ -26,8 +33,14 @@ impl DbrType { use DbrType::*; let res = match k { 0 => DbrString, - 1 => DbrInt, + 1 => DbrShort, 9 => DbrStsFloat, + 14 => DbrTimeString, + 15 => DbrTimeShort, + 16 => DbrTimeFloat, + 17 => DbrTimeEnum, + 18 => DbrTimeChar, + 19 => DbrTimeLong, 20 => DbrTimeDouble, _ => { let msg = format!("not a valid/supported dbr type: {}", k); @@ -37,16 +50,60 @@ impl DbrType { Ok(res) } - #[allow(dead_code)] - fn byte_len(&self) -> usize { + fn meta_len(&self) -> usize { use DbrType::*; match self { DbrString => 0, - DbrInt => 4, - DbrStsFloat => 1, - DbrTimeDouble => 16, + DbrShort => 0, + DbrStsFloat => 4, + DbrTimeString => 12, + DbrTimeShort => 12, + DbrTimeFloat => 12, + DbrTimeEnum => 12, + DbrTimeChar => 12, + DbrTimeLong => 12, + DbrTimeDouble => 12, } } + + fn pad_meta(&self) -> usize { + use DbrType::*; + match self { + DbrString => 0, + DbrShort => 0, + DbrStsFloat => 0, + DbrTimeString => 0, + DbrTimeShort => 2, + DbrTimeFloat => 0, + DbrTimeEnum => 2, + DbrTimeChar => 3, + DbrTimeLong => 0, + DbrTimeDouble => 4, + } + } + + fn val_len(&self) -> usize { + use DbrType::*; + match self { + DbrString => 40, + DbrShort => 2, + DbrStsFloat => 4, + DbrTimeString => 40, + DbrTimeShort => 2, + DbrTimeFloat => 4, + DbrTimeEnum => 2, + DbrTimeChar => 1, + DbrTimeLong => 4, + DbrTimeDouble => 8, + } + } + + fn msg_len(&self, count: usize) -> usize { + let n = self.meta_len() + self.pad_meta() + count * self.val_len(); + let r = n % 8; + let n = if r == 0 { n } else { n + 8 - r }; + n + } } #[derive(Debug)] @@ -204,6 +261,148 @@ pub async fn read_datafile_header2(rb: &mut RingBuf, pos: DataheaderPos) - Ok(ret) } +trait MetaParse { + fn parse_meta(buf: &[u8]) -> (u64, usize); +} + +struct NoneMetaParse; + +impl MetaParse for NoneMetaParse { + #[inline(always)] + fn parse_meta(_buf: &[u8]) -> (u64, usize) { + (0, 0) + } +} + +struct TimeMetaParse; + +impl MetaParse for TimeMetaParse { + #[inline(always)] + fn parse_meta(buf: &[u8]) -> (u64, usize) { + let tsa = u32::from_be_bytes(buf[4..8].try_into().unwrap()); + let tsb = u32::from_be_bytes(buf[8..12].try_into().unwrap()); + let ts = tsa as u64 * SEC + tsb as u64 + EPICS_EPOCH_OFFSET; + (ts, 12) + } +} + +#[inline(always)] +fn parse_msg( + buf: &[u8], + _meta_parse: MP, + dbrt: DbrType, + dbrcount: usize, + valf: F, +) -> Result<(u64, VT, usize), Error> +where + F: Fn(&[u8], usize) -> VT, +{ + let (ts, n) = MP::parse_meta(buf); + let buf = &buf[n + dbrt.pad_meta()..]; + Ok((ts, valf(buf, dbrcount), n)) +} + +macro_rules! ex_s { + ($sty:ident, $n:ident) => { + fn $n(buf: &[u8], _dbrcount: usize) -> $sty { + const R: usize = std::mem::size_of::<$sty>(); + $sty::from_be_bytes(buf[0..R].try_into().unwrap()) + } + }; +} + +macro_rules! ex_v { + ($sty:ident, $n:ident) => { + fn $n(mut buf: &[u8], dbrcount: usize) -> Vec<$sty> { + const R: usize = std::mem::size_of::<$sty>(); + let mut a = Vec::with_capacity(dbrcount); + for _ in 0..dbrcount { + let v = $sty::from_be_bytes(buf[0..R].try_into().unwrap()); + a.push(v); + buf = &buf[R..]; + } + a + } + }; +} + +ex_s!(i8, ex_s_i8); +ex_s!(i16, ex_s_i16); +ex_s!(i32, ex_s_i32); +ex_s!(f32, ex_s_f32); +ex_s!(f64, ex_s_f64); + +ex_v!(i8, ex_v_i8); +ex_v!(i16, ex_v_i16); +ex_v!(i32, ex_v_i32); +ex_v!(f32, ex_v_f32); +ex_v!(f64, ex_v_f64); + +macro_rules! read_msg { + ($sty:ident, $exfs:ident, $exfv:ident, $evvar:ident, $rb:expr, $msglen:expr, $numsamples:expr, $dbrt:expr, $dbrcount:ident) => { + if $dbrcount == 1 { + let mut evs = EventValues::empty(); + for _ in 0..$numsamples { + $rb.fill_min($msglen).await?; + let buf = $rb.data(); + let (ts, val, _) = parse_msg(buf, TimeMetaParse, $dbrt.clone(), $dbrcount, $exfs)?; + evs.tss.push(ts); + evs.values.push(val); + $rb.adv($msglen); + } + let evs = ScalarPlainEvents::$evvar(evs); + let plain = PlainEvents::Scalar(evs); + let item = EventsItem::Plain(plain); + item + } else { + let mut evs = WaveEvents::empty(); + for _ in 0..$numsamples { + $rb.fill_min($msglen).await?; + let buf = $rb.data(); + let (ts, val, _) = parse_msg(buf, TimeMetaParse, $dbrt.clone(), $dbrcount, $exfv)?; + evs.tss.push(ts); + evs.vals.push(val); + $rb.adv($msglen); + } + let evs = WavePlainEvents::$evvar(evs); + let plain = PlainEvents::Wave(evs); + let item = EventsItem::Plain(plain); + item + } + }; +} + +async fn _format_debug_1(rb: &mut RingBuf, dbrcount: usize) -> Result<(), Error> { + rb.fill_min(1024 * 10).await?; + for i1 in 0..19 { + let hex = format_hex_block(&rb.data()[512 * i1..], 512); + error!("dbrcount {} block\n{}", dbrcount, hex); + } + return Err(Error::with_msg_no_trace("EXIT")); +} + +fn _format_debug_2(evs: WaveEvents) -> Result<(), Error> { + info!("tss: {:?}", evs.tss); + let n = evs.vals.len(); + let vals: Vec<_> = evs + .vals + .iter() + .enumerate() + .filter(|&(i, _)| i < 3 || i + 3 >= n) + .map(|(_i, j)| { + if j.len() > 6 { + let mut a = j[0..3].to_vec(); + a.extend_from_slice(&j[j.len() - 3..]); + a.to_vec() + } else { + j.to_vec() + } + }) + .collect(); + info!("vals: {:?}", vals); + Ok(()) +} + pub async fn read_data2( rb: &mut RingBuf, datafile_header: &DatafileHeader, @@ -211,58 +410,55 @@ pub async fn read_data2( _expand: bool, ) -> Result { // TODO handle expand mode - //let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; - //seek(file, SeekFrom::Start(dhpos), stats).await?; - let res = match &datafile_header.dbr_type { - DbrType::DbrTimeDouble => { - if datafile_header.dbr_count == 1 { - trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); - let mut evs = EventValues { - tss: vec![], - values: vec![], - }; - let n1 = datafile_header.num_samples as usize; - //let n2 = datafile_header.dbr_type.byte_len(); - let n2 = 2 + 2 + 4 + 4 + (4) + 8; - let n3 = n1 * n2; - rb.fill_min(n3).await?; - //let mut buf = vec![0; n3]; - //read_exact(file, &mut buf, stats).await?; - let buf = rb.data(); - let mut p1 = 0; - let mut ntot = 0; - while p1 < n3 - n2 { - let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); - p1 += 2; - let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); - p1 += 2; - let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); - p1 += 4; - let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); - p1 += 4; - let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; - p1 += 4; - let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap()); - p1 += 8; - ntot += 1; - if ts1 >= range.beg && ts1 < range.end { - evs.tss.push(ts1); - evs.values.push(value); - } - } - rb.adv(n3); - //info!("parsed block with {} / {} events", ntot, evs.tss.len()); - let evs = ScalarPlainEvents::Double(evs); + { + let dpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; + if rb.rp_abs() != dpos { + warn!("read_data2 rb not positioned {} vs {}", rb.rp_abs(), dpos); + rb.seek(dpos).await?; + } + } + let numsamples = datafile_header.num_samples as usize; + let dbrcount = datafile_header.dbr_count; + let dbrt = datafile_header.dbr_type.clone(); + let dbrt = if let DbrType::DbrTimeEnum = dbrt { + DbrType::DbrTimeShort + } else { + dbrt + }; + let msg_len = dbrt.msg_len(dbrcount); + { + if (datafile_header.buf_size as usize) < numsamples * msg_len { + return Err(Error::with_msg_no_trace(format!( + "buffer too small for data {} {} {}", + datafile_header.buf_size, numsamples, msg_len + ))); + } + } + if dbrcount == 0 { + return Err(Error::with_msg_no_trace(format!("unexpected dbrcount {}", dbrcount))); + } + let res = match &dbrt { + DbrType::DbrTimeChar => read_msg!(i8, ex_s_i8, ex_v_i8, Byte, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeShort => read_msg!(i16, ex_s_i16, ex_v_i16, Short, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeLong => read_msg!(i32, ex_s_i32, ex_v_i32, Int, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeFloat => read_msg!(f32, ex_s_f32, ex_v_f32, Float, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeDouble => read_msg!(f64, ex_s_f64, ex_v_f64, Double, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeString => { + if dbrcount == 1 { + // TODO + let evs = ScalarPlainEvents::Byte(EventValues::empty()); let plain = PlainEvents::Scalar(evs); let item = EventsItem::Plain(plain); item } else { - let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count); - error!("{}", msg); - return Err(Error::with_msg_no_trace(msg)); + // TODO + let evs = WavePlainEvents::Double(WaveEvents::empty()); + let plain = PlainEvents::Wave(evs); + let item = EventsItem::Plain(plain); + item } } - _ => { + DbrType::DbrTimeEnum | DbrType::DbrShort | DbrType::DbrString | DbrType::DbrStsFloat => { let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type); error!("{}", msg); return Err(Error::with_msg_no_trace(msg)); diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index 1fbbce5..f95357f 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -200,7 +200,6 @@ impl ScanIndexFiles { } } else if rows.len() == 1 { let rid = rows[0].try_get(0)?; - info!("select done: {}", rid); rid } else { return Err(Error::with_msg("not unique")); @@ -360,7 +359,6 @@ impl ScanChannels { } } else if rows.len() == 1 { let rid = rows[0].try_get(0)?; - info!("select done: {}", rid); rid } else { return Err(Error::with_msg("not unique")); diff --git a/archapp/src/archeng/ringbuf.rs b/archapp/src/archeng/ringbuf.rs index 227ac51..adae730 100644 --- a/archapp/src/archeng/ringbuf.rs +++ b/archapp/src/archeng/ringbuf.rs @@ -150,6 +150,6 @@ impl fmt::Debug for RingBuf { impl Drop for RingBuf { fn drop(&mut self) { - info!("Drop {:?}", self); + trace!("Drop {:?}", self); } } diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 3289a00..13b7565 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -1,8 +1,9 @@ use crate::create_connection; use err::Error; -use netpod::{ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, NodeConfigCached}; +use netpod::{ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, NodeConfigCached}; +use serde_json::Value as JsVal; -pub async fn search_channel( +pub async fn search_channel_databuffer( query: ChannelSearchQuery, node_config: &NodeConfigCached, ) -> Result { @@ -43,11 +44,12 @@ pub async fn search_channel( }, None => vec![], }; + let ty: String = row.get(3); let k = ChannelSearchSingleResult { backend: row.get(7), name: row.get(1), source: row.get(2), - ty: row.get(3), + ty, shape: shape, unit: row.get(5), description: row.get(6), @@ -58,3 +60,98 @@ pub async fn search_channel( let ret = ChannelSearchResult { channels: res }; Ok(ret) } + +pub async fn search_channel_archeng( + query: ChannelSearchQuery, + backend: String, + conf: &ChannelArchiver, +) -> Result { + let sql = format!(concat!( + "select c.name, c.config", + " from channels c", + " where c.name ~* $1", + " order by c.name", + " limit 100" + )); + let cl = create_connection(&conf.database).await?; + let rows = cl.query(sql.as_str(), &[&query.name_regex]).await?; + let mut res = vec![]; + for row in rows { + let name: String = row.get(0); + let config: JsVal = row.get(1); + let st = match config.get("scalarType") { + Some(k) => match k { + JsVal::String(k) => match k.as_str() { + "U8" => "Uint8", + "U16" => "Uint16", + "U32" => "Uint32", + "U64" => "Uint64", + "I8" => "Int8", + "I16" => "Int16", + "I32" => "Int32", + "I64" => "Int64", + "F32" => "Float32", + "F64" => "Float64", + _ => k, + } + .into(), + _ => "", + }, + None => "", + }; + let shape = match config.get("shape") { + Some(k) => match k { + JsVal::String(k) => { + if k == "Scalar" { + vec![] + } else { + return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + } + } + JsVal::Object(k) => match k.get("Wave") { + Some(k) => match k { + JsVal::Number(k) => { + vec![k.as_i64().unwrap_or(u32::MAX as i64) as u32] + } + _ => { + return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + } + }, + None => { + return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + } + }, + _ => { + return Err(Error::with_msg_no_trace(format!("can not understand {:?}", config))); + } + }, + None => vec![], + }; + let k = ChannelSearchSingleResult { + backend: backend.clone(), + name, + source: String::new(), + ty: st.into(), + shape, + unit: String::new(), + description: String::new(), + is_api_0: None, + }; + res.push(k); + } + let ret = ChannelSearchResult { channels: res }; + Ok(ret) +} + +pub async fn search_channel( + query: ChannelSearchQuery, + node_config: &NodeConfigCached, +) -> Result { + if let Some(conf) = node_config.node.channel_archiver.as_ref() { + search_channel_archeng(query, node_config.node.backend.clone(), conf).await + } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { + err::todoval() + } else { + search_channel_databuffer(query, node_config).await + } +} diff --git a/fsio/Cargo.toml b/fsio/Cargo.toml new file mode 100644 index 0000000..fb91c6c --- /dev/null +++ b/fsio/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "fsio" +version = "0.0.1-a.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/fsio.rs" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" +chrono = { version = "0.4.19", features = ["serde"] } +tokio = { version = "1.11.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +#tokio-stream = {version = "0.1.5", features = ["fs"]} +#hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +async-channel = "1.6" +bytes = "1.0.1" +crc32fast = "1.2.1" +arrayref = "0.3.6" +byteorder = "1.4.3" +futures-core = "0.3.14" +futures-util = "0.3.14" +tracing = "0.1.25" +tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "std-future"] } +fs2 = "0.4.3" +libc = "0.2.93" +hex = "0.4.3" +url = "2.2.2" +tiny-keccak = { version = "2.0", features = ["sha3"] } +err = { path = "../err" } +taskrun = { path = "../taskrun" } +netpod = { path = "../netpod" } +bitshuffle = { path = "../bitshuffle" } +items = { path = "../items" } +streams = { path = "../streams" } diff --git a/fsio/src/fsio.rs b/fsio/src/fsio.rs new file mode 100644 index 0000000..4ebb94e --- /dev/null +++ b/fsio/src/fsio.rs @@ -0,0 +1,175 @@ +use err::Error; +use netpod::log::*; +#[allow(unused)] +use std::os::unix::prelude::OpenOptionsExt; +use std::os::unix::prelude::{AsRawFd, OsStrExt}; +use std::path::PathBuf; +use tokio::fs::OpenOptions; + +const BASE: &str = "/data/daqbuffer-testdata"; + +fn fcntl_xlock(file: &mut std::fs::File, beg: i64, cmd: libc::c_int, ty: i32) -> i32 { + unsafe { + let p = libc::flock { + l_type: ty as i16, + l_whence: libc::SEEK_SET as i16, + l_start: beg, + l_len: 8, + l_pid: 0, + }; + libc::fcntl(file.as_raw_fd(), cmd, &p) + } +} + +fn wlock(file: &mut std::fs::File, beg: i64) -> i32 { + fcntl_xlock(file, beg, libc::F_OFD_SETLK, libc::F_WRLCK) +} + +fn rlock(file: &mut std::fs::File, beg: i64) -> i32 { + fcntl_xlock(file, beg, libc::F_OFD_SETLK, libc::F_RDLCK) +} + +fn unlock(file: &mut std::fs::File, beg: i64) -> i32 { + fcntl_xlock(file, beg, libc::F_OFD_SETLK, libc::F_UNLCK) +} + +#[allow(unused)] +async fn lock_1() -> Result<(), Error> { + let path = PathBuf::from(BASE).join("tmp-daq4-f1"); + let mut f1 = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(false) + .open(path) + .await?; + f1.as_raw_fd(); + + let mx1 = std::sync::Arc::new(tokio::sync::Mutex::new(0usize)); + let mg1 = mx1.lock().await; + + let (tx1, rx2) = std::sync::mpsc::channel(); + let (tx2, rx1) = std::sync::mpsc::channel(); + + let t1 = std::thread::spawn({ + move || { + let path = PathBuf::from(BASE).join("tmp-daq4-f1"); + let mut f1 = std::fs::OpenOptions::new().read(true).write(true).open(&path).unwrap(); + info!("Thread 1 rlock..."); + let ec = rlock(&mut f1, 0); + info!("Thread 1 rlock {}", ec); + tx1.send(1u32).unwrap(); + rx1.recv().unwrap(); + info!("Thread 1 unlock..."); + let ec = unlock(&mut f1, 0); + info!("Thread 1 unlock {}", ec); + tx1.send(1u32).unwrap(); + rx1.recv().unwrap(); + info!("Thread 1 rlock..."); + let ec = rlock(&mut f1, 0); + info!("Thread 1 rlock {}", ec); + tx1.send(1u32).unwrap(); + rx1.recv().unwrap(); + info!("Thread 1 done"); + } + }); + let t2 = std::thread::spawn({ + move || { + let path = PathBuf::from(BASE).join("tmp-daq4-f1"); + let mut f1 = std::fs::OpenOptions::new().read(true).write(true).open(&path).unwrap(); + rx2.recv().unwrap(); + info!("Thread 2 wlock..."); + let ec = wlock(&mut f1, 0); + info!("Thread 2 wlock {}", ec); + tx2.send(1u32).unwrap(); + rx2.recv().unwrap(); + info!("Thread 2 rlock"); + let ec = rlock(&mut f1, 0); + info!("Thread 2 rlock {}", ec); + tx2.send(1u32).unwrap(); + rx2.recv().unwrap(); + tx2.send(1u32).unwrap(); + info!("Thread 2 done"); + } + }); + tokio::task::spawn_blocking(move || { + t1.join().map_err(|_| Error::with_msg_no_trace("join error"))?; + t2.join().map_err(|_| Error::with_msg_no_trace("join error"))?; + Ok::<_, Error>(()) + }) + .await??; + Ok(()) +} + +#[allow(unused)] +async fn write_1() -> Result<(), Error> { + let path = PathBuf::from(BASE).join("tmp-daq4-f2"); + let mut f1 = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(false) + .open(path) + .await?; + unsafe { + let path_d = PathBuf::from(BASE); + let mut path_d_b = path_d.as_os_str().as_bytes().to_vec(); + //info!("path_d_b {:?}", path_d_b); + path_d_b.push(0); + let fdd = libc::open(path_d_b.as_ptr() as *const i8, libc::O_DIRECTORY | libc::O_RDONLY); + if fdd < 0 { + panic!(); + } + let ec = libc::fsync(fdd); + if ec != 0 { + panic!(); + } + let ec = libc::close(fdd); + if ec != 0 { + panic!(); + } + let fd = f1.as_raw_fd(); + let lockparam = libc::flock { + l_type: libc::F_RDLCK as i16, + l_whence: libc::SEEK_SET as i16, + l_start: 0, + l_len: 8, + l_pid: 0, + }; + let ec = libc::fcntl(f1.as_raw_fd(), libc::F_OFD_SETLK, &lockparam); + if ec != 0 { + panic!(); + } + let buf = b"world!"; + let n = libc::pwrite(fd, buf.as_ptr() as *const libc::c_void, buf.len(), 0); + if n != buf.len() as isize { + panic!(); + } + let ec = libc::fsync(fd); + if ec != 0 { + panic!(); + } + let lockparam = libc::flock { + l_type: libc::F_UNLCK as i16, + l_whence: libc::SEEK_SET as i16, + l_start: 0, + l_len: 8, + l_pid: 0, + }; + let ec = libc::fcntl(f1.as_raw_fd(), libc::F_OFD_SETLK, &lockparam); + if ec == 0 { + panic!(); + } + } + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn t1() -> Result<(), Error> { + Ok(taskrun::run(write_1()).unwrap()) + } +} diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 81d91f5..a35026c 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -99,10 +99,12 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf let (head, reqbody) = req.into_parts(); let bodybytes = hyper::body::to_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; - match head.headers.get("accept") { + match head.headers.get(http::header::ACCEPT) { Some(v) => { if v == APP_JSON { let query = ChannelSearchQuery { + // TODO + backend: None, name_regex: query.regex.map_or(String::new(), |k| k), source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), @@ -190,11 +192,13 @@ pub async fn channel_search_configs_v1( let (head, reqbody) = req.into_parts(); let bodybytes = hyper::body::to_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; - match head.headers.get("accept") { + match head.headers.get(http::header::ACCEPT) { Some(v) => { if v == APP_JSON { // Transform the ChannelSearchQueryV1 to ChannelSearchQuery let query = ChannelSearchQuery { + // TODO + backend: None, name_regex: query.regex.map_or(String::new(), |k| k), source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 4663f3f..56be69a 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -9,6 +9,7 @@ use netpod::query::RawEventsQuery; use netpod::{get_url_query_pairs, log::*, Channel, NanoRange}; use netpod::{NodeConfigCached, APP_JSON_LINES}; use serde::Serialize; +use serde_json::Value as JsVal; use url::Url; fn json_lines_stream(stream: S) -> impl Stream, Error>> @@ -158,6 +159,87 @@ impl ScanChannels { } } +pub struct ChannelNames {} + +impl ChannelNames { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/channel/names" + } + + pub fn name() -> &'static str { + "ChannelNames" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + use archapp_wrap::archapp::archeng; + let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); + let stream = json_lines_stream(stream); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(stream))?) + } +} + +pub struct ScanConfigs {} + +impl ScanConfigs { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/scan/configs" + } + + pub fn name() -> &'static str { + "ScanConfigs" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + use archapp_wrap::archapp::archeng; + let stream = archeng::configs::ChannelNameStream::new(conf.database.clone()); + let stream = archeng::configs::ConfigStream::new(stream, conf.clone()); + let stream = json_lines_stream(stream); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(stream))?) + } +} + pub struct BlockRefStream {} impl BlockRefStream { @@ -190,9 +272,13 @@ impl BlockRefStream { "this node is not configured as channel archiver", ))?; let range = NanoRange { beg: 0, end: u64::MAX }; + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let channel_name = pairs.get("channelName").map(String::from).unwrap_or("NONE".into()); let channel = Channel { backend: "".into(), - name: "ARIDI-PCT:CURRENT".into(), + name: channel_name, + //name: "ARIDI-PCT:CURRENT".into(), }; let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range, conf.clone()); let s = s.map(|item| match item { @@ -251,20 +337,26 @@ impl BlockStream { "this node is not configured as channel archiver", ))?; let range = NanoRange { beg: 0, end: u64::MAX }; - let channel = Channel { - backend: "".into(), - name: "ARIDI-PCT:CURRENT".into(), - }; let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); let read_queue = pairs.get("readQueue").unwrap_or(&"1".to_string()).parse()?; - let s = archapp_wrap::archapp::archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone()); + let channel_name = pairs.get("channelName").map(String::from).unwrap_or("NONE".into()); + let channel = Channel { + backend: "".into(), + name: channel_name, + //name: "ARIDI-PCT:CURRENT".into(), + }; + use archapp_wrap::archapp::archeng; + let s = archeng::blockrefstream::blockref_stream(channel, range.clone(), conf.clone()); let s = Box::pin(s); - let s = archapp_wrap::archapp::archeng::blockstream::BlockStream::new(s, range.clone(), read_queue); + let s = archeng::blockstream::BlockStream::new(s, range.clone(), read_queue); let s = s.map(|item| match item { Ok(item) => { - //use archapp_wrap::archapp::archeng::blockstream::BlockItem::*; - Ok(item) + use archeng::blockstream::BlockItem; + match item { + BlockItem::EventsItem(item) => Ok(JsVal::String("EventsItem".into())), + BlockItem::JsVal(jsval) => Ok(jsval), + } } Err(e) => Err(e), }); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index f50d5c3..27354cd 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -290,6 +290,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) { h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ScanConfigs::should_handle(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ChannelNames::should_handle(path) { + h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::BlockRefStream::should_handle(path) { h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::BlockStream::should_handle(path) { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 5be057e..cdd8d5a 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,3 +1,5 @@ +pub mod api4; + use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; use crate::gather::{gather_get_json_generic, SubRes}; use crate::{api_1_docs, api_4_docs, response, Cont}; @@ -78,7 +80,8 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if path == "/api/4/search/channel" { - Ok(channel_search(req, proxy_config).await?) + //Ok(channel_search(req, proxy_config).await?) + Ok(api4::channel_search(req, proxy_config).await?) } else if path == "/api/4/events" { Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path == "/api/4/binned" { diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs new file mode 100644 index 0000000..12775f9 --- /dev/null +++ b/httpret/src/proxy/api4.rs @@ -0,0 +1,89 @@ +use crate::gather::{gather_get_json_generic, SubRes}; +use crate::response; +use err::Error; +use futures_core::Future; +use http::{header, Request, Response, StatusCode}; +use hyper::Body; +use itertools::Itertools; +use netpod::log::*; +use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; +use std::pin::Pin; +use std::time::Duration; +use url::Url; + +pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let (head, _body) = req.into_parts(); + let vdef = header::HeaderValue::from_static(APP_JSON); + let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); + if v == APP_JSON || v == "*/*" { + let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; + let query = ChannelSearchQuery::from_url(&inpurl)?; + let mut bodies = vec![]; + let urls = proxy_config + .backends + .iter() + .filter(|k| { + if let Some(back) = &query.backend { + back == &k.name + } else { + true + } + }) + .map(|pb| match Url::parse(&format!("{}/api/4/search/channel", pb.url)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))), + }) + .fold_ok(vec![], |mut a, x| { + a.push(x); + bodies.push(None); + a + })?; + let tags = urls.iter().map(|k| k.to_string()).collect(); + let nt = |res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + //info!("got a result {:?}", body); + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => { + let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + }; + Ok(res) + }; + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec>| { + let mut res = vec![]; + for j in all { + for k in j.val.channels { + res.push(k); + } + } + let res = ChannelSearchResult { channels: res }; + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(res) + }; + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; + Ok(ret) + } else { + info!("bad accept: {:?}", head.headers.get(header::ACCEPT)); + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::from(format!("{:?}", proxy_config.name)))?) + } +} diff --git a/httpret/src/search.rs b/httpret/src/search.rs index b79a8d6..e1c000c 100644 --- a/httpret/src/search.rs +++ b/httpret/src/search.rs @@ -8,18 +8,19 @@ use url::Url; pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - match head.headers.get(header::ACCEPT) { - Some(v) if v == APP_JSON => { - let s1 = format!("dummy:{}", head.uri); - info!("try to parse {:?}", s1); - let url = Url::parse(&s1)?; - let query = ChannelSearchQuery::from_url(&url)?; - info!("search query: {:?}", query); - let res = dbconn::search::search_channel(query, node_config).await?; - let body = Body::from(serde_json::to_string(&res)?); - let ret = super::response(StatusCode::OK).body(body)?; - Ok(ret) - } - _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + let vdef = header::HeaderValue::from_static(APP_JSON); + let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); + if v == APP_JSON || v == "*/*" { + let s1 = format!("dummy:{}", head.uri); + info!("try to parse {:?}", s1); + let url = Url::parse(&s1)?; + let query = ChannelSearchQuery::from_url(&url)?; + info!("search query: {:?}", query); + let res = dbconn::search::search_channel(query, node_config).await?; + let body = Body::from(serde_json::to_string(&res)?); + let ret = super::response(StatusCode::OK).body(body)?; + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } } diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index b431b4a..af5dd19 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -1,99 +1,109 @@ + - + Databuffer API 4 Documentation - - + + + -

Databuffer API 4 Documentation

+

Databuffer API 4 Documentation

-

Documented here is the databuffer http api 4. The "original" unversioned api is documented at - this location.

-

API version 1: -https://data-api.psi.ch/api/1/documentation/

-

In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand, -so please feel free to create some Jira ticket!

+

Documented here is the databuffer http api 4. The "original" unversioned api is documented at + this + location. +

+

API version 1: + https://data-api.psi.ch/api/1/documentation/ +

+

In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand, + so please feel free to create some Jira ticket!

-

Timestamp format

-

The result encodes timestamps in the form:

-
{
+    

Timestamp format

+

The result encodes timestamps in the form:

+
{
   "tsAnchor": 1623909860,                    // Time-anchor of this result in UNIX epoch seconds.
   "tsOffMs": [173, 472, 857, ...],       // Millisecond-offset to tsAnchor for each event/bin-edge.
   "tsOffNs": [422901, 422902, 422903, ...],  // Nanosecond-offset to tsAnchor in addition to tsOffMs for each event/bin-edge.
 }
-

which results in these nanosecond-timestamps:

-
1623909860573422901
+    

which results in these nanosecond-timestamps:

+
1623909860573422901
 1623909875671422902
 1623909897932422903
-

Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs

-

Two reasons lead to this choice of timestamp format:

-
    -
  • Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
  • -
  • The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.
  • -
+

Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs

+

Two reasons lead to this choice of timestamp format:

+
    +
  • Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
  • +
  • The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id. +
  • +
-

API functions

-

Currently available functionality:

- +

API functions

+

Currently available functionality:

+ - -

List available backends

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/backends

-

Request header: "Accept" must be "application/json"

-

CURL example:

-
+    
+    

List available backends

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/backends

+

Request header: "Accept" must be "application/json"

+

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
 
-

Example response

-
{
+    

Example response

+
{
   "backends": [
     "sf-databuffer",
     "hipa-archive",
     "gls-archive",
-    "proscan-archive"
+    "proscan-archive",
+    "sls-archive"
   ]
 }
- -

Search channel

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/search/channel

-

Query parameters: (all optional)

-
    -
  • nameRegex (e.g. "LSCP.*6")
  • -
  • sourceRegex (e.g. "178:9999")
  • -
  • descriptionRegex (e.g. "celsius")
  • -
-

Request header: "Accept" must be "application/json"

-

Full channel list is long, so it's encouraged to provide a search string of some minimal length.

+ +

Search channel

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/search/channel

+

Query parameters: (all optional)

+
    +
  • backend (e.g. "sf-databuffer", "sls-archive", ... any from list-backends API)
  • +
  • nameRegex (e.g. "LSCP.*6")
  • +
  • sourceRegex (e.g. "178:9999")
  • +
  • descriptionRegex (e.g. "celsius")
  • +
+

Request header: "Accept" should be "application/json" for forward-compatibility but can be + omitted for e.g. a quick manual search using CURL.

+

Full channel list is long, so it's encouraged to provide a search string of some minimal length.

-

CURL example:

-
+    

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel?sourceRegex=CV.E.+37&nameRegex=120.+y2$'
 
-

Example response:

-

Keys always present: name, backend.

-

Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, sometimes -that key is missing.

-
{
+    

Example response:

+

Keys always present: name, backend.

+

Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, + sometimes + that key is missing.

+
{
   "channels": [
     {
       "name": "S10MA01-DBPM120:Y2",
@@ -125,36 +135,37 @@ that key is missing.

} ] }
-

The search constraints are AND'd.

+

The search constraints are AND'd.

- -

Query event data

-

Returns the full event values in a given time range.

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/events

-

Query parameters:

-
    -
  • channelBackend (e.g. "sf-databuffer")
  • -
  • channelName (e.g. "S10CB02-RBOC-DCP10:FOR-AMPLT-AVG")
  • -
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • -
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • -
-

Request header: "Accept" must be "application/json"

+ +

Query event data

+

Returns the full event values in a given time range.

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/events

+

Query parameters:

+
    +
  • channelBackend (e.g. "sf-databuffer")
  • +
  • channelName (e.g. "S10CB02-RBOC-DCP10:FOR-AMPLT-AVG")
  • +
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • +
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • +
+

Request header: "Accept" must be "application/json"

-

Timeout

-

If the requested range takes too long to retrieve, then the flags timedOut: true will be set.

+

Timeout

+

If the requested range takes too long to retrieve, then the flags timedOut: true will be set. +

-

CURL example:

-
+    

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channelBackend=sf-databuffer
   &channelName=S10CB02-RBOC-DCP10:FOR-AMPLT-AVG&begDate=2021-05-26T07:10:00.000Z&endDate=2021-05-26T07:16:00.000Z'
 
-

Example response:

-
+    

Example response:

+
 {
   "finalisedRange": true,
   "tsAnchor": 1623763172,
@@ -179,51 +190,56 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
 }
 
-

Finalised range

-

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+

Finalised range

+

If the server can determine that no more data will be added to the requested time range + then it will add the flag finalisedRange: true to the response.

- -

Query binned data

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/binned

-

Query parameters:

-
    -
  • channelBackend (e.g. "sf-databuffer")
  • -
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • -
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • -
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • -
  • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends on bin-cache-grid-resolution. The server tries to find the best match.)
  • -
  • binningScheme (optional)
  • + +

    Query binned data

    +

    Method: GET

    +

    URL: https://data-api.psi.ch/api/4/binned

    +

    Query parameters:

      -
    • if not specified: default is "binningScheme=unweightedScalar".
    • -
    • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
    • -
    • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar.
    • -
    • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
    • -
    • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.
    • +
    • channelBackend (e.g. "sf-databuffer")
    • +
    • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
    • +
    • begDate (e.g. "2021-05-26T07:10:00.000Z")
    • +
    • endDate (e.g. "2021-05-26T07:16:00.000Z")
    • +
    • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends + on + bin-cache-grid-resolution. The server tries to find the best match.)
    • +
    • binningScheme (optional)
    • +
        +
      • if not specified: default is "binningScheme=unweightedScalar".
      • +
      • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
      • +
      • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar. +
      • +
      • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension + (waveform-dimension).
      • +
      • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length. +
      • +
    -
-

Request header: "Accept" must be "application/json"

+

Request header: "Accept" must be "application/json"

-

CURL example:

-
+    

CURL example:

+
 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channelBackend=sf-databuffer
   &channelName=SLAAR-LSCP4-LAS6891:CH7:1&begDate=2021-05-25T00:00:00.000Z&endDate=2021-05-26T00:00:00.000Z&binCount=3'
 
-

Partial result

-

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned. - The partial result will contain the necessary information to send another request with a range that - starts with the first not-yet-retrieved bin. - This information is provided by the continueAt and missingBins fields. - This enables the user agent to start the presentation to the user while updating the user interface - as new bins are received.

+

Partial result

+

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned. + The partial result will contain the necessary information to send another request with a range that + starts with the first not-yet-retrieved bin. + This information is provided by the continueAt and missingBins fields. + This enables the user agent to start the presentation to the user while updating the user interface + as new bins are received.

-

Example response (without usage of binningScheme):

-
{
+    

Example response (without usage of binningScheme):

+
{
   "avgs": [
     16204.087890625,
     16204.3798828125,
@@ -285,8 +301,8 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
 }
 
-

Example response (waveform channel and usage of binningScheme):

-
{
+    

Example response (waveform channel and usage of binningScheme):

+
{
   "tsAnchor": 1623769950,
   "tsMs": [
     0,
@@ -380,20 +396,21 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
 }
 
-

Complete result

-

If the result does not contain a continueAt key then the result is complete.

+

Complete result

+

If the result does not contain a continueAt key then the result is complete.

-

Finalised range

-

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+

Finalised range

+

If the server can determine that no more data will be added to the requested time range + then it will add the flag finalisedRange: true to the response.

-

Feedback and comments very much appreciated!

-

dominik.werder@psi.ch

-

or please assign me a JIRA ticket.

+

Feedback and comments very much appreciated!

+

dominik.werder@psi.ch

+

or please assign me a JIRA ticket.

- + - + + \ No newline at end of file diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 0d73add..aef1eff 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1077,6 +1077,7 @@ pub fn channel_from_pairs(pairs: &BTreeMap) -> Result, pub name_regex: String, pub source_regex: String, pub description_regex: String, @@ -1086,6 +1087,7 @@ impl ChannelSearchQuery { pub fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); let ret = Self { + backend: pairs.get("backend").map(Into::into), name_regex: pairs.get("nameRegex").map_or(String::new(), |k| k.clone()), source_regex: pairs.get("sourceRegex").map_or(String::new(), |k| k.clone()), description_regex: pairs.get("descriptionRegex").map_or(String::new(), |k| k.clone()), @@ -1094,10 +1096,13 @@ impl ChannelSearchQuery { } pub fn append_to_url(&self, url: &mut Url) { - url.query_pairs_mut().append_pair("nameRegex", &self.name_regex); - url.query_pairs_mut().append_pair("sourceRegex", &self.source_regex); - url.query_pairs_mut() - .append_pair("descriptionRegex", &self.description_regex); + let mut qp = url.query_pairs_mut(); + if let Some(v) = &self.backend { + qp.append_pair("backend", v); + } + qp.append_pair("nameRegex", &self.name_regex); + qp.append_pair("sourceRegex", &self.source_regex); + qp.append_pair("descriptionRegex", &self.description_regex); } } @@ -1144,6 +1149,7 @@ pub struct ProxyBackend { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ProxyConfig { + pub name: String, pub listen: String, pub port: u16, pub search_hosts: Vec, diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 9638254..f058d69 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -95,9 +95,9 @@ pub fn tracing_init() { "archapp::archeng=info", "archapp::archeng::datablockstream=info", "archapp::archeng::indextree=info", - "archapp::archeng::blockstream=trace", - "archapp::archeng::ringbuf=trace", - "archapp::archeng::backreadbuf=trace", + "archapp::archeng::blockstream=info", + "archapp::archeng::ringbuf=info", + "archapp::archeng::backreadbuf=info", "archapp::storagemerge=info", "daqbuffer::test=trace", ]