From 09724fd540d0e821f99b9c33b649754816cb2625 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 9 Jul 2021 21:34:03 +0200 Subject: [PATCH] Start fetching events --- Cargo.toml | 2 +- archapp/src/events.rs | 216 +++++++++++++++++++++++++++++++---- archapp/src/parse.rs | 78 +++++++++---- archapp_wrap/src/lib.rs | 8 +- disk/Cargo.toml | 1 + disk/src/binned/prebinned.rs | 35 +++--- disk/src/channelexec.rs | 10 ++ err/src/lib.rs | 8 ++ httpclient/Cargo.toml | 22 ++++ httpclient/src/lib.rs | 27 +++++ httpret/src/lib.rs | 35 ++++-- netpod/src/lib.rs | 35 +++++- parse/src/channelconfig.rs | 21 +++- 13 files changed, 412 insertions(+), 86 deletions(-) create mode 100644 httpclient/Cargo.toml create mode 100644 httpclient/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index d0508a8..07ee74e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet"] +members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient"] [profile.release] debug = 1 diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 048d3b5..8d7235f 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,40 +1,157 @@ use crate::parse::PbFileReader; +use crate::EventsItem; +use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; -use items::Framable; +use items::eventvalues::EventValues; +use items::{Framable, RangeCompletableItem, StreamItem}; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached, Shape}; +use netpod::timeunits::DAY; +use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape}; use serde_json::Value as JsonValue; +use std::path::PathBuf; use std::pin::Pin; +use tokio::fs::{read_dir, File}; -pub async fn make_event_pipe( - _evq: &RawEventsQuery, - _aa: &ArchiverAppliance, -) -> Result> + Send>>, Error> { - err::todoval() +struct DataFilename { + year: u32, + month: u32, } -pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result { +fn parse_data_filename(s: &str) -> Result { + if !s.ends_with(".pb") { + return Err(Error::with_msg_no_trace("not a .pb file")); + } + if s.len() < 12 { + return Err(Error::with_msg_no_trace("filename too short")); + } + let j = &s[s.len() - 11..]; + if &j[0..1] != ":" { + return Err(Error::with_msg_no_trace("no colon")); + } + if &j[5..6] != "_" { + return Err(Error::with_msg_no_trace("no underscore")); + } + let year: u32 = j[1..5].parse()?; + let month: u32 = j[6..8].parse()?; + let ret = DataFilename { year, month }; + Ok(ret) +} + +pub async fn make_event_pipe( + evq: &RawEventsQuery, + aa: &ArchiverAppliance, +) -> Result> + Send>>, Error> { + info!("make_event_pipe {:?}", evq); + let evq = evq.clone(); + let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, aa)?; + let channel_info = channel_info(&evq.channel, aa).await?; + //let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32); + let (tx, rx) = async_channel::bounded(16); + let block1 = async move { + let mut rd = tokio::fs::read_dir(&dir).await?; + while let Some(de) = rd.next_entry().await? { + let s = de.file_name().to_string_lossy().into_owned(); + if s.starts_with(&prefix) && s.ends_with(".pb") { + match parse_data_filename(&s) { + Ok(df) => { + let ts0 = Utc.ymd(df.year as i32, df.month, 0).and_hms(0, 0, 0); + let ts1 = ts0.timestamp() as u64 * 1000000000 + ts0.timestamp_subsec_nanos() as u64; + if evq.range.beg < ts1 + DAY * 32 && evq.range.end > ts1 { + let f1 = File::open(de.path()).await?; + info!("opened {:?}", de.path()); + let mut pbr = PbFileReader::new(f1).await; + pbr.read_header().await?; + loop { + match pbr.read_msg().await { + Ok(ev) => match ev { + EventsItem::ScalarDouble(h) => { + // + let (x, y) = h + .tss + .into_iter() + .zip(h.values.into_iter()) + .filter_map(|(j, k)| { + if j < evq.range.beg || j >= evq.range.end { + None + } else { + Some((j, k)) + } + }) + .fold((vec![], vec![]), |(mut a, mut b), (j, k)| { + a.push(j); + b.push(k); + (a, b) + }); + let b = EventValues { tss: x, values: y }; + let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b))); + tx.send(Box::new(b) as Box).await?; + } + _ => { + // + error!("case not covered"); + return Err(Error::with_msg_no_trace("todo")); + } + }, + Err(e) => {} + } + } + } + } + Err(e) => {} + } + } + } + Ok::<_, Error>(()) + }; + let block2 = async move { + match block1.await { + Ok(_) => { + info!("block1 done ok"); + } + Err(e) => { + error!("{:?}", e); + } + } + }; + tokio::task::spawn(block2); + Ok(Box::pin(rx)) +} + +struct DirAndPrefix { + dir: PathBuf, + prefix: String, +} + +fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Result { // SARUN11/CVME/DBLM546/IOC_CPU_LOAD // SARUN11-CVME-DBLM546:IOC_CPU_LOAD let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect(); - let path1 = node_config - .node - .archiver_appliance - .as_ref() - .unwrap() - .data_base_path - .clone(); - let path2 = a.iter().take(a.len() - 1).fold(path1, |a, &x| a.join(x)); + let path = aa.data_base_path.clone(); + let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x)); + let ret = DirAndPrefix { + dir: path, + prefix: a + .last() + .ok_or_else(|| Error::with_msg_no_trace("no prefix in file"))? + .to_string(), + }; + Ok(ret) +} + +pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { + let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa)?; let mut msgs = vec![]; - msgs.push(format!("a: {:?}", a)); - msgs.push(format!("path2: {}", path2.to_string_lossy())); - let mut rd = tokio::fs::read_dir(&path2).await?; + msgs.push(format!("path: {}", dir.to_string_lossy())); + let mut scalar_type = None; + let mut shape = None; + let mut rd = read_dir(&dir).await?; while let Some(de) = rd.next_entry().await? { let s = de.file_name().to_string_lossy().into_owned(); - if s.starts_with(a.last().unwrap()) && s.ends_with(".pb") { + if s.starts_with(&prefix) && s.ends_with(".pb") { msgs.push(s); - let f1 = tokio::fs::File::open(de.path()).await?; + let f1 = File::open(de.path()).await?; let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; msgs.push(format!("got header {}", pbr.channel_name())); @@ -42,6 +159,57 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> match ev { Ok(item) => { msgs.push(format!("got event {:?}", item)); + shape = Some(match &item { + EventsItem::ScalarByte(_) => Shape::Scalar, + EventsItem::ScalarShort(_) => Shape::Scalar, + EventsItem::ScalarInt(_) => Shape::Scalar, + EventsItem::ScalarFloat(_) => Shape::Scalar, + EventsItem::ScalarDouble(_) => Shape::Scalar, + EventsItem::WaveByte(item) => Shape::Wave( + item.vals + .first() + .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? + .len() as u32, + ), + EventsItem::WaveShort(item) => Shape::Wave( + item.vals + .first() + .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? + .len() as u32, + ), + EventsItem::WaveInt(item) => Shape::Wave( + item.vals + .first() + .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? + .len() as u32, + ), + EventsItem::WaveFloat(item) => Shape::Wave( + item.vals + .first() + .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? + .len() as u32, + ), + EventsItem::WaveDouble(item) => Shape::Wave( + item.vals + .first() + .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? + .len() as u32, + ), + }); + // These type mappings are defined by the protobuffer schema. + scalar_type = Some(match item { + EventsItem::ScalarByte(_) => ScalarType::U8, + EventsItem::ScalarShort(_) => ScalarType::I32, + EventsItem::ScalarInt(_) => ScalarType::I32, + EventsItem::ScalarFloat(_) => ScalarType::F32, + EventsItem::ScalarDouble(_) => ScalarType::F64, + EventsItem::WaveByte(_) => ScalarType::U8, + EventsItem::WaveShort(_) => ScalarType::I32, + EventsItem::WaveInt(_) => ScalarType::I32, + EventsItem::WaveFloat(_) => ScalarType::F32, + EventsItem::WaveDouble(_) => ScalarType::F64, + }); + break; } Err(e) => { msgs.push(format!("can not read event! {:?}", e)); @@ -50,8 +218,12 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> msgs.push(format!("got header {}", pbr.channel_name())); } } + let shape = shape.ok_or_else(|| Error::with_msg("could not determine shape"))?; + let scalar_type = scalar_type.ok_or_else(|| Error::with_msg("could not determine scalar_type"))?; let ret = ChannelInfo { - shape: Shape::Scalar, + scalar_type, + byte_order: None, + shape, msg: JsonValue::Array(msgs.into_iter().map(JsonValue::String).collect()), }; Ok(ret) diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 2574ad5..c8adce1 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -2,11 +2,12 @@ use crate::generated::EPICSEvent::PayloadType; use crate::{unescape_archapp_msg, EventsItem}; use archapp_xc::*; use async_channel::{bounded, Receiver}; +use chrono::{TimeZone, Utc}; use err::Error; use items::eventvalues::EventValues; use items::waveevents::WaveEvents; use netpod::log::*; -use netpod::NodeConfigCached; +use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; use protobuf::Message; use serde::Serialize; use serde_json::Value as JsonValue; @@ -25,16 +26,32 @@ pub struct PbFileReader { year: u32, } +fn parse_scalar_byte(m: &[u8], year: u32) -> Result { + let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m) + .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?; + let mut t = EventValues:: { + tss: vec![], + values: vec![], + }; + let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0); + let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; + let v = msg.get_val().first().map_or(0, |k| *k as i32); + t.tss.push(ts); + t.values.push(v); + Ok(EventsItem::ScalarByte(t)) +} + macro_rules! scalar_parse { - ($m:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ + ($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m) .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?; let mut t = EventValues::<$evty> { tss: vec![], values: vec![], }; - // TODO Translate by the file-time-offset. - let ts = msg.get_secondsintoyear() as u64; + let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0); + let ts = + yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val(); t.tss.push(ts); t.values.push(v); @@ -43,15 +60,16 @@ macro_rules! scalar_parse { } macro_rules! wave_parse { - ($m:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ + ($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{ let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m) .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?; let mut t = WaveEvents::<$evty> { tss: vec![], vals: vec![], }; - // TODO Translate by the file-time-offset. - let ts = msg.get_secondsintoyear() as u64; + let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0); + let ts = + yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val(); t.tss.push(ts); t.vals.push(v.to_vec()); @@ -59,11 +77,13 @@ macro_rules! wave_parse { }}; } +const MIN_BUF_FILL: usize = 1024 * 16; + impl PbFileReader { pub async fn new(file: File) -> Self { Self { file, - buf: vec![0; 1024 * 128], + buf: vec![0; MIN_BUF_FILL * 4], wp: 0, rp: 0, channel_name: String::new(), @@ -93,42 +113,39 @@ impl PbFileReader { let m = unescape_archapp_msg(&buf[self.rp..k])?; use PayloadType::*; let ei = match self.payload_type { - SCALAR_BYTE => { - //scalar_parse!(&m, ScalarByte, ScalarByte, u8) - err::todoval() - } + SCALAR_BYTE => parse_scalar_byte(&m, self.year)?, SCALAR_ENUM => { - scalar_parse!(&m, ScalarEnum, ScalarInt, i32) + scalar_parse!(&m, self.year, ScalarEnum, ScalarInt, i32) } SCALAR_SHORT => { - scalar_parse!(&m, ScalarShort, ScalarShort, i32) + scalar_parse!(&m, self.year, ScalarShort, ScalarShort, i32) } SCALAR_INT => { - scalar_parse!(&m, ScalarInt, ScalarInt, i32) + scalar_parse!(&m, self.year, ScalarInt, ScalarInt, i32) } SCALAR_FLOAT => { - scalar_parse!(&m, ScalarFloat, ScalarFloat, f32) + scalar_parse!(&m, self.year, ScalarFloat, ScalarFloat, f32) } SCALAR_DOUBLE => { - scalar_parse!(&m, ScalarDouble, ScalarDouble, f64) + scalar_parse!(&m, self.year, ScalarDouble, ScalarDouble, f64) } WAVEFORM_BYTE => { - wave_parse!(&m, VectorChar, WaveByte, u8) + wave_parse!(&m, self.year, VectorChar, WaveByte, u8) } WAVEFORM_SHORT => { - wave_parse!(&m, VectorShort, WaveShort, i32) + wave_parse!(&m, self.year, VectorShort, WaveShort, i32) } WAVEFORM_ENUM => { - wave_parse!(&m, VectorEnum, WaveInt, i32) + wave_parse!(&m, self.year, VectorEnum, WaveInt, i32) } WAVEFORM_INT => { - wave_parse!(&m, VectorInt, WaveInt, i32) + wave_parse!(&m, self.year, VectorInt, WaveInt, i32) } WAVEFORM_FLOAT => { - wave_parse!(&m, VectorFloat, WaveFloat, f32) + wave_parse!(&m, self.year, VectorFloat, WaveFloat, f32) } WAVEFORM_DOUBLE => { - wave_parse!(&m, VectorDouble, WaveDouble, f64) + wave_parse!(&m, self.year, VectorDouble, WaveDouble, f64) } SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => { return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type))); @@ -139,10 +156,10 @@ impl PbFileReader { } async fn fill_buf(&mut self) -> Result<(), Error> { - if self.wp - self.rp >= 1024 * 16 { + if self.wp - self.rp >= MIN_BUF_FILL { return Ok(()); } - if self.rp >= 1024 * 42 { + if self.rp >= self.buf.len() - MIN_BUF_FILL { let n = self.wp - self.rp; for i in 0..n { self.buf[i] = self.buf[self.rp + i]; @@ -397,3 +414,14 @@ pub async fn scan_files_inner( tokio::spawn(block2); Ok(rx) } + +pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result { + let ci = crate::events::channel_info(&q.channel, aa).await?; + let ret = ChannelConfigResponse { + channel: q.channel.clone(), + scalar_type: ci.scalar_type, + byte_order: ci.byte_order, + shape: ci.shape, + }; + Ok(ret) +} diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 411dfad..0fa2662 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -4,7 +4,7 @@ use err::Error; use futures_core::Stream; use items::Framable; use netpod::query::RawEventsQuery; -use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached}; +use netpod::{ArchiverAppliance, Channel, ChannelConfigQuery, ChannelConfigResponse, ChannelInfo, NodeConfigCached}; use std::collections::BTreeMap; use std::future::Future; use std::pin::Pin; @@ -24,5 +24,9 @@ pub async fn make_event_pipe( } pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result { - archapp::events::channel_info(channel, node_config).await + archapp::events::channel_info(channel, node_config.node.archiver_appliance.as_ref().unwrap()).await +} + +pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result { + archapp::parse::channel_config(q, aa).await } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 35ba1ba..8077137 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -39,3 +39,4 @@ bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } items = { path = "../items" } +httpclient = { path = "../httpclient" } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index b3428c3..51154f8 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -11,8 +11,7 @@ use futures_core::Stream; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps}; use items::{Appendable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType}; -use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use netpod::{AggKind, ByteOrder, ChannelConfigQuery, NodeConfigCached, ScalarType, Shape}; use serde::de::DeserializeOwned; use serde::Serialize; use std::pin::Pin; @@ -123,6 +122,9 @@ macro_rules! match_end { }; } +// TODO is the distinction on byte order necessary here? +// We should rely on the "events" http api to deliver data, and the cache, both +// of those have fixed endianness. fn make_num_pipeline( scalar_type: ScalarType, byte_order: ByteOrder, @@ -166,28 +168,17 @@ pub async fn pre_binned_bytes_for_http( )); return Err(err); } - let channel_config = match read_local_config(&query.channel(), &node_config.node).await { - Ok(k) => k, - Err(e) => { - if e.msg().contains("ErrorKind::NotFound") { - let s = futures_util::stream::empty(); - let ret = Box::pin(s); - return Ok(ret); - } else { - return Err(e); - } - } - }; - let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?; - let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")), - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")), - MatchingConfigEntry::Entry(entry) => entry, + + let q = ChannelConfigQuery { + channel: query.channel().clone(), + range: query.patch().patch_range(), }; + let conf = httpclient::get_channel_config(&q, node_config).await?; let ret = make_num_pipeline( - entry.scalar_type.clone(), - entry.byte_order.clone(), - entry.to_shape()?, + conf.scalar_type.clone(), + // TODO actually, make_num_pipeline should not depend on endianness. + conf.byte_order.unwrap_or(ByteOrder::LE).clone(), + conf.shape.clone(), query.agg_kind().clone(), query.clone(), node_config, diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 903e905..7c8655a 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -433,3 +433,13 @@ impl ChannelExecFunction for PlainEventsJson { Box::pin(futures_util::stream::empty()) } } + +pub fn dummy_impl() { + let channel: Channel = err::todoval(); + let range: NanoRange = err::todoval(); + let agg_kind: AggKind = err::todoval(); + let node_config: NodeConfigCached = err::todoval(); + let timeout: Duration = err::todoval(); + let f = PlainEventsJson::new(channel.clone(), range.clone(), 0, timeout, node_config.clone(), false); + let _ = channel_exec(f, &channel, &range, agg_kind, &node_config); +} diff --git a/err/src/lib.rs b/err/src/lib.rs index 2947cf5..ee6f25d 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -34,6 +34,14 @@ impl Error { } } + pub fn with_msg_no_trace>(s: S) -> Self { + Self { + msg: s.into(), + trace: None, + trace_str: None, + } + } + pub fn msg(&self) -> &str { &self.msg } diff --git a/httpclient/Cargo.toml b/httpclient/Cargo.toml new file mode 100644 index 0000000..6800ace --- /dev/null +++ b/httpclient/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "httpclient" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } +http = "0.2" +url = "2.2" +tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +hyper-tls = { version="0.5.0" } +bytes = "1.0.1" +futures-core = "0.3.14" +futures-util = "0.3.14" +tracing = "0.1.25" +async-channel = "1.6" +err = { path = "../err" } +netpod = { path = "../netpod" } +parse = { path = "../parse" } diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs new file mode 100644 index 0000000..63fbc80 --- /dev/null +++ b/httpclient/src/lib.rs @@ -0,0 +1,27 @@ +use err::Error; +use hyper::{Body, Method}; +use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; +use url::Url; + +pub async fn get_channel_config( + q: &ChannelConfigQuery, + node_config: &NodeConfigCached, +) -> Result { + let mut url = Url::parse(&format!( + "http://{}:{}/api/4/channel/config", + "localhost", node_config.node.port + ))?; + q.append_to_url(&mut url); + let req = hyper::Request::builder() + .method(Method::GET) + .uri(url.as_str()) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if !res.status().is_success() { + return Err(Error::with_msg("http client error")); + } + let buf = hyper::body::to_bytes(res.into_body()).await?; + let ret: ChannelConfigResponse = serde_json::from_slice(&buf)?; + Ok(ret) +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 0eea68a..9617b78 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -13,8 +13,8 @@ use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; use netpod::{ - channel_from_pairs, get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, - APP_OCTET, + channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON, + APP_JSON_LINES, APP_OCTET, }; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; @@ -431,11 +431,19 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON { - Ok(plain_events_json(req, node_config).await?) + Ok(plain_events_json(req, node_config).await.map_err(|e| { + error!("{:?}", e); + e + })?) } else if accept == APP_OCTET { - Ok(plain_events_binary(req, node_config).await?) + Ok(plain_events_binary(req, node_config).await.map_err(|e| { + error!("{:?}", e); + e + })?) } else { - Err(Error::with_msg(format!("unexpected Accept: {:?}", accept))) + let e = Error::with_msg(format!("unexpected Accept: {:?}", accept)); + error!("{:?}", e); + Err(e) } } @@ -455,6 +463,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) } async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; let op = disk::channelexec::PlainEventsJson::new( @@ -610,17 +619,19 @@ pub async fn update_search_cache(req: Request, node_config: &NodeConfigCac } pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("channel_config"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let pairs = get_url_query_pairs(&url); - let _dry = pairs.contains_key("dry"); - let channel = Channel { - backend: node_config.node.backend.clone(), - name: pairs.get("channelName").unwrap().into(), + //let pairs = get_url_query_pairs(&url); + let q = ChannelConfigQuery::from_url(&url)?; + info!("ChannelConfigQuery {:?}", q); + let conf = if let Some(aa) = &node_config.node.archiver_appliance { + archapp_wrap::channel_config(&q, aa).await? + } else { + parse::channelconfig::channel_config(&q, &node_config.node).await? }; - let res = parse::channelconfig::read_local_config(&channel, &node_config.node).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(Body::from(serde_json::to_string(&conf)?))?; Ok(ret) } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 604d9e9..7b018db 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -970,9 +970,16 @@ pub fn get_url_query_pairs(url: &Url) -> BTreeMap { BTreeMap::from_iter(url.query_pairs().map(|(j, k)| (j.to_string(), k.to_string()))) } -#[derive(Serialize, Deserialize)] +/** +Request type of the channel/config api. \ +At least on some backends the channel configuration may change depending on the queried range. +Therefore, the query includes the range. +The presence of a configuration in some range does not imply that there is any data available. +*/ +#[derive(Debug, Serialize, Deserialize)] pub struct ChannelConfigQuery { pub channel: Channel, + pub range: NanoRange, } impl HasBackend for ChannelConfigQuery { @@ -990,8 +997,14 @@ impl HasTimeout for ChannelConfigQuery { impl FromUrl for ChannelConfigQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); + let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let ret = Self { channel: channel_from_pairs(&pairs)?, + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, }; Ok(ret) } @@ -999,12 +1012,30 @@ impl FromUrl for ChannelConfigQuery { impl AppendToUrl for ChannelConfigQuery { fn append_to_url(&self, url: &mut Url) { + let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let mut g = url.query_pairs_mut(); g.append_pair("channelBackend", &self.channel.backend); g.append_pair("channelName", &self.channel.name); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); } } +#[derive(Debug, Serialize, Deserialize)] +pub struct ChannelConfigResponse { + pub channel: Channel, + #[serde(rename = "scalarType")] + pub scalar_type: ScalarType, + pub byte_order: Option, + pub shape: Shape, +} + #[derive(Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(pub String); @@ -1013,6 +1044,8 @@ Provide basic information about a channel, especially it's shape. */ #[derive(Serialize, Deserialize)] pub struct ChannelInfo { + pub scalar_type: ScalarType, + pub byte_order: Option, pub shape: Shape, pub msg: serde_json::Value, } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 2127981..8181279 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,6 +1,8 @@ use err::Error; use netpod::timeunits::MS; -use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::{ + ByteOrder, Channel, ChannelConfigQuery, ChannelConfigResponse, NanoRange, Nanos, Node, ScalarType, Shape, +}; use nom::bytes::complete::take; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; @@ -253,6 +255,23 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } +pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result { + let conf = read_local_config(&q.channel, node).await?; + let entry_res = extract_matching_config_entry(&q.range, &conf)?; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")), + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")), + MatchingConfigEntry::Entry(entry) => entry, + }; + let ret = ChannelConfigResponse { + channel: q.channel.clone(), + scalar_type: entry.scalar_type.clone(), + byte_order: Some(entry.byte_order.clone()), + shape: entry.to_shape()?, + }; + Ok(ret) +} + pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { let path = node .data_base_path